Merge remote-tracking branch 'origin/main' into debug
This commit is contained in:
commit
80ee79e39b
|
@ -21,7 +21,7 @@
|
||||||
#include "fdbclient/FDBTypes.h"
|
#include "fdbclient/FDBTypes.h"
|
||||||
#include "flow/ProtocolVersion.h"
|
#include "flow/ProtocolVersion.h"
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#define FDB_INCLUDE_LEGACY_TYPES
|
#define FDB_INCLUDE_LEGACY_TYPES
|
||||||
|
|
||||||
#include "fdbclient/MultiVersionTransaction.h"
|
#include "fdbclient/MultiVersionTransaction.h"
|
||||||
|
@ -905,6 +905,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_transaction_get_committed_version(FDBTransa
|
||||||
CATCH_AND_RETURN(*out_version = TXN(tr)->getCommittedVersion(););
|
CATCH_AND_RETURN(*out_version = TXN(tr)->getCommittedVersion(););
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr) {
|
||||||
|
return (FDBFuture*)TXN(tr)->getTotalCost().extractPtr();
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
|
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
|
||||||
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
|
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,10 +27,10 @@
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if !defined(FDB_API_VERSION)
|
#if !defined(FDB_API_VERSION)
|
||||||
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 720)
|
#error You must #define FDB_API_VERSION prior to including fdb_c.h (current version is 730)
|
||||||
#elif FDB_API_VERSION < 13
|
#elif FDB_API_VERSION < 13
|
||||||
#error API version no longer supported (upgrade to 13)
|
#error API version no longer supported (upgrade to 13)
|
||||||
#elif FDB_API_VERSION > 720
|
#elif FDB_API_VERSION > 730
|
||||||
#error Requested API version requires a newer version of this header
|
#error Requested API version requires a newer version of this header
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -514,12 +514,14 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_transaction_get_committed_version(F
|
||||||
int64_t* out_version);
|
int64_t* out_version);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This function intentionally returns an FDBFuture instead of an integer
|
* These functions intentionally return an FDBFuture instead of an integer
|
||||||
* directly, so that calling this API can see the effect of previous
|
* directly, so that calling the API can see the effect of previous
|
||||||
* mutations on the transaction. Specifically, mutations are applied
|
* mutations on the transaction. Specifically, mutations are applied
|
||||||
* asynchronously by the main thread. In order to see them, this call has to
|
* asynchronously by the main thread. In order to see them, this call has to
|
||||||
* be serviced by the main thread too.
|
* be serviced by the main thread too.
|
||||||
*/
|
*/
|
||||||
|
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_total_cost(FDBTransaction* tr);
|
||||||
|
|
||||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);
|
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr);
|
||||||
|
|
||||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr);
|
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* tr);
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
|
|
||||||
namespace FdbApiTester {
|
namespace FdbApiTester {
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ namespace FdbApiTester {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
#define API_VERSION_CLIENT_TMP_DIR 720
|
#define API_VERSION_CLIENT_TMP_DIR 730
|
||||||
|
|
||||||
enum TesterOptionId {
|
enum TesterOptionId {
|
||||||
OPT_CONNFILE,
|
OPT_CONNFILE,
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
|
|
||||||
#include "unit/fdb_api.hpp"
|
#include "unit/fdb_api.hpp"
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#ifndef FDB_API_VERSION
|
#ifndef FDB_API_VERSION
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
#define MAKO_HPP
|
#define MAKO_HPP
|
||||||
|
|
||||||
#ifndef FDB_API_VERSION
|
#ifndef FDB_API_VERSION
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <array>
|
#include <array>
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
|
|
||||||
#ifndef FDB_API_VERSION
|
#ifndef FDB_API_VERSION
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
// Unit tests that test the timeouts for a disconnected cluster
|
// Unit tests that test the timeouts for a disconnected cluster
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
|
|
@ -231,6 +231,10 @@ Int64Future Transaction::get_approximate_size() {
|
||||||
return Int64Future(fdb_transaction_get_approximate_size(tr_));
|
return Int64Future(fdb_transaction_get_approximate_size(tr_));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Int64Future Transaction::get_total_cost() {
|
||||||
|
return Int64Future(fdb_transaction_get_total_cost(tr_));
|
||||||
|
}
|
||||||
|
|
||||||
KeyFuture Transaction::get_versionstamp() {
|
KeyFuture Transaction::get_versionstamp() {
|
||||||
return KeyFuture(fdb_transaction_get_versionstamp(tr_));
|
return KeyFuture(fdb_transaction_get_versionstamp(tr_));
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
@ -276,6 +276,9 @@ public:
|
||||||
// Returns a future which will be set to the approximate transaction size so far.
|
// Returns a future which will be set to the approximate transaction size so far.
|
||||||
Int64Future get_approximate_size();
|
Int64Future get_approximate_size();
|
||||||
|
|
||||||
|
// Returns a future which will be set tot the transaction's total cost so far.
|
||||||
|
Int64Future get_total_cost();
|
||||||
|
|
||||||
// Returns a future which will be set to the versionstamp which was used by
|
// Returns a future which will be set to the versionstamp which was used by
|
||||||
// any versionstamp operations in the transaction.
|
// any versionstamp operations in the transaction.
|
||||||
KeyFuture get_versionstamp();
|
KeyFuture get_versionstamp();
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
// Unit tests for API setup, network initialization functions from the FDB C API.
|
// Unit tests for API setup, network initialization functions from the FDB C API.
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
// Unit tests for the FoundationDB C API.
|
// Unit tests for the FoundationDB C API.
|
||||||
|
|
||||||
#include "fdb_c_options.g.h"
|
#include "fdb_c_options.g.h"
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include <foundationdb/fdb_c.h>
|
#include <foundationdb/fdb_c.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
@ -1945,6 +1945,30 @@ TEST_CASE("fdb_transaction_get_committed_version") {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("fdb_transaction_get_total_cost") {
|
||||||
|
fdb::Transaction tr(db);
|
||||||
|
while (1) {
|
||||||
|
fdb::ValueFuture f1 = tr.get("foo", /*snapshot*/ false);
|
||||||
|
fdb_error_t err = wait_future(f1);
|
||||||
|
if (err) {
|
||||||
|
fdb::EmptyFuture fOnError = tr.on_error(err);
|
||||||
|
fdb_check(wait_future(fOnError));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
fdb::Int64Future f2 = tr.get_total_cost();
|
||||||
|
err = wait_future(f2);
|
||||||
|
if (err) {
|
||||||
|
fdb::EmptyFuture fOnError = tr.on_error(err);
|
||||||
|
fdb_check(wait_future(fOnError));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int64_t cost;
|
||||||
|
fdb_check(f2.get(&cost));
|
||||||
|
CHECK(cost > 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("fdb_transaction_get_approximate_size") {
|
TEST_CASE("fdb_transaction_get_approximate_size") {
|
||||||
fdb::Transaction tr(db);
|
fdb::Transaction tr(db);
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define FDB_API_VERSION 720
|
#define FDB_API_VERSION 730
|
||||||
#include "foundationdb/fdb_c.h"
|
#include "foundationdb/fdb_c.h"
|
||||||
#undef DLLEXPORT
|
#undef DLLEXPORT
|
||||||
#include "workloads.h"
|
#include "workloads.h"
|
||||||
|
|
|
@ -11,16 +11,16 @@ The global tag throttler bases throttling decisions on "quotas" provided by clie
|
||||||
The global tag throttler cannot throttle tags to a throughput below the reserved quota, and it cannot allow throughput to exceed the total quota.
|
The global tag throttler cannot throttle tags to a throughput below the reserved quota, and it cannot allow throughput to exceed the total quota.
|
||||||
|
|
||||||
### Cost
|
### Cost
|
||||||
Internally, the units for these quotas are "page costs", computed as follows. The "page cost" of a read operation is computed as:
|
Internally, the units for these quotas are bytes. The cost of an operation is rounded up to the nearest page size. The cost of a read operation is computed as:
|
||||||
|
|
||||||
```
|
```
|
||||||
readCost = ceiling(bytesRead / CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
readCost = ceiling(bytesRead / CLIENT_KNOBS->READ_COST_BYTE_FACTOR) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||||
```
|
```
|
||||||
|
|
||||||
The "page cost" of a write operation is computed as:
|
The cost of a write operation is computed as:
|
||||||
|
|
||||||
```
|
```
|
||||||
writeCost = SERVER_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR);
|
writeCost = CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||||
```
|
```
|
||||||
|
|
||||||
Here `bytesWritten` includes cleared bytes. The size of range clears is estimated at commit time.
|
Here `bytesWritten` includes cleared bytes. The size of range clears is estimated at commit time.
|
||||||
|
@ -41,12 +41,6 @@ To set the quota through `fdbcli`, run:
|
||||||
fdbcli> quota set <tag> [reserved_throughput|total_throughput] <bytes_per_second>
|
fdbcli> quota set <tag> [reserved_throughput|total_throughput] <bytes_per_second>
|
||||||
```
|
```
|
||||||
|
|
||||||
Note that the quotas are specified in terms of bytes/second, and internally converted to page costs:
|
|
||||||
|
|
||||||
```
|
|
||||||
page_cost_quota = ceiling(byte_quota / CLIENT_KNOBS->READ_COST_BYTE_FACTOR)
|
|
||||||
```
|
|
||||||
|
|
||||||
To clear a both reserved and total throughput quotas for a tag, run:
|
To clear a both reserved and total throughput quotas for a tag, run:
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
|
@ -43,9 +43,9 @@ Optional<LimitType> parseLimitType(StringRef token) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<double> parseLimitValue(StringRef token) {
|
Optional<int64_t> parseLimitValue(StringRef token) {
|
||||||
try {
|
try {
|
||||||
return std::stod(token.toString());
|
return std::stol(token.toString());
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
@ -63,9 +63,9 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
||||||
} else {
|
} else {
|
||||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||||
if (limitType == LimitType::TOTAL) {
|
if (limitType == LimitType::TOTAL) {
|
||||||
fmt::print("{}\n", quota.totalQuota * CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
fmt::print("{}\n", quota.totalQuota);
|
||||||
} else if (limitType == LimitType::RESERVED) {
|
} else if (limitType == LimitType::RESERVED) {
|
||||||
fmt::print("{}\n", quota.reservedQuota * CLIENT_KNOBS->READ_COST_BYTE_FACTOR);
|
fmt::print("{}\n", quota.reservedQuota);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Void();
|
return Void();
|
||||||
|
@ -75,7 +75,7 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, double value) {
|
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, int64_t value) {
|
||||||
state Reference<ITransaction> tr = db->createTransaction();
|
state Reference<ITransaction> tr = db->createTransaction();
|
||||||
loop {
|
loop {
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
@ -89,9 +89,13 @@ ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
||||||
// Internally, costs are stored in terms of pages, but in the API,
|
// Internally, costs are stored in terms of pages, but in the API,
|
||||||
// costs are specified in terms of bytes
|
// costs are specified in terms of bytes
|
||||||
if (limitType == LimitType::TOTAL) {
|
if (limitType == LimitType::TOTAL) {
|
||||||
quota.totalQuota = (value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1;
|
// Round up to nearest page size
|
||||||
|
quota.totalQuota =
|
||||||
|
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||||
} else if (limitType == LimitType::RESERVED) {
|
} else if (limitType == LimitType::RESERVED) {
|
||||||
quota.reservedQuota = (value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1;
|
// Round up to nearest page size
|
||||||
|
quota.reservedQuota =
|
||||||
|
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||||
}
|
}
|
||||||
if (!quota.isValid()) {
|
if (!quota.isValid()) {
|
||||||
throw invalid_throttle_quota_value();
|
throw invalid_throttle_quota_value();
|
||||||
|
|
|
@ -232,10 +232,10 @@ void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
|
||||||
.detail("ExpectedHeaderSalt", header.cipherHeaderDetails.salt);
|
.detail("ExpectedHeaderSalt", header.cipherHeaderDetails.salt);
|
||||||
throw encrypt_header_metadata_mismatch();
|
throw encrypt_header_metadata_mismatch();
|
||||||
}
|
}
|
||||||
// Validate encryption header 'cipherHeader' details sanity
|
// Validate encryption header 'cipherText' details sanity
|
||||||
if (!(header.cipherHeaderDetails.baseCipherId == eKeys.headerCipherKey->getBaseCipherId() &&
|
if (!(header.cipherTextDetails.baseCipherId == eKeys.textCipherKey->getBaseCipherId() &&
|
||||||
header.cipherHeaderDetails.encryptDomainId == eKeys.headerCipherKey->getDomainId() &&
|
header.cipherTextDetails.encryptDomainId == eKeys.textCipherKey->getDomainId() &&
|
||||||
header.cipherHeaderDetails.salt == eKeys.headerCipherKey->getSalt())) {
|
header.cipherTextDetails.salt == eKeys.textCipherKey->getSalt())) {
|
||||||
TraceEvent(SevError, "EncryptionHeader_CipherTextMismatch")
|
TraceEvent(SevError, "EncryptionHeader_CipherTextMismatch")
|
||||||
.detail("TextDomainId", eKeys.textCipherKey->getDomainId())
|
.detail("TextDomainId", eKeys.textCipherKey->getDomainId())
|
||||||
.detail("ExpectedTextDomainId", header.cipherTextDetails.encryptDomainId)
|
.detail("ExpectedTextDomainId", header.cipherTextDetails.encryptDomainId)
|
||||||
|
|
|
@ -272,6 +272,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
||||||
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
|
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
|
||||||
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
|
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
|
||||||
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
|
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
|
||||||
|
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
||||||
|
|
||||||
// busyness reporting
|
// busyness reporting
|
||||||
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
|
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );
|
||||||
|
|
|
@ -414,6 +414,20 @@ Version DLTransaction::getCommittedVersion() {
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<int64_t> DLTransaction::getTotalCost() {
|
||||||
|
if (!api->transactionGetTotalCost) {
|
||||||
|
return unsupported_operation();
|
||||||
|
}
|
||||||
|
|
||||||
|
FdbCApi::FDBFuture* f = api->transactionGetTotalCost(tr);
|
||||||
|
return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||||
|
int64_t size = 0;
|
||||||
|
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &size);
|
||||||
|
ASSERT(!error);
|
||||||
|
return size;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
|
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
|
||||||
if (!api->transactionGetApproximateSize) {
|
if (!api->transactionGetApproximateSize) {
|
||||||
return unsupported_operation();
|
return unsupported_operation();
|
||||||
|
@ -950,6 +964,11 @@ void DLApi::init() {
|
||||||
fdbCPath,
|
fdbCPath,
|
||||||
"fdb_transaction_get_committed_version",
|
"fdb_transaction_get_committed_version",
|
||||||
headerVersion >= 0);
|
headerVersion >= 0);
|
||||||
|
loadClientFunction(&api->transactionGetTotalCost,
|
||||||
|
lib,
|
||||||
|
fdbCPath,
|
||||||
|
"fdb_transaction_get_total_cost",
|
||||||
|
headerVersion >= ApiVersion::withGetTotalCost().version());
|
||||||
loadClientFunction(&api->transactionGetApproximateSize,
|
loadClientFunction(&api->transactionGetApproximateSize,
|
||||||
lib,
|
lib,
|
||||||
fdbCPath,
|
fdbCPath,
|
||||||
|
@ -1486,6 +1505,12 @@ ThreadFuture<SpanContext> MultiVersionTransaction::getSpanContext() {
|
||||||
return SpanContext();
|
return SpanContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<int64_t> MultiVersionTransaction::getTotalCost() {
|
||||||
|
auto tr = getTransaction();
|
||||||
|
auto f = tr.transaction ? tr.transaction->getTotalCost() : makeTimeout<int64_t>();
|
||||||
|
return abortableFuture(f, tr.onChange);
|
||||||
|
}
|
||||||
|
|
||||||
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
|
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
|
||||||
auto tr = getTransaction();
|
auto tr = getTransaction();
|
||||||
auto f = tr.transaction ? tr.transaction->getApproximateSize() : makeTimeout<int64_t>();
|
auto f = tr.transaction ? tr.transaction->getApproximateSize() : makeTimeout<int64_t>();
|
||||||
|
|
|
@ -3456,6 +3456,8 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
|
||||||
}
|
}
|
||||||
trState->cx->getValueCompleted->latency = timer_int() - startTime;
|
trState->cx->getValueCompleted->latency = timer_int() - startTime;
|
||||||
trState->cx->getValueCompleted->log();
|
trState->cx->getValueCompleted->log();
|
||||||
|
trState->totalCost +=
|
||||||
|
getReadOperationCost(key.size() + (reply.value.present() ? reply.value.get().size() : 0));
|
||||||
|
|
||||||
if (getValueID.present()) {
|
if (getValueID.present()) {
|
||||||
g_traceBatch.addEvent("GetValueDebug",
|
g_traceBatch.addEvent("GetValueDebug",
|
||||||
|
@ -4285,6 +4287,7 @@ void getRangeFinished(Reference<TransactionState> trState,
|
||||||
RangeResultFamily result) {
|
RangeResultFamily result) {
|
||||||
int64_t bytes = getRangeResultFamilyBytes(result);
|
int64_t bytes = getRangeResultFamilyBytes(result);
|
||||||
|
|
||||||
|
trState->totalCost += getReadOperationCost(bytes);
|
||||||
trState->cx->transactionBytesRead += bytes;
|
trState->cx->transactionBytesRead += bytes;
|
||||||
trState->cx->transactionKeysRead += result.size();
|
trState->cx->transactionKeysRead += result.size();
|
||||||
|
|
||||||
|
@ -5767,6 +5770,7 @@ void Transaction::set(const KeyRef& key, const ValueRef& value, AddConflictRange
|
||||||
auto r = singleKeyRange(key, req.arena);
|
auto r = singleKeyRange(key, req.arena);
|
||||||
auto v = ValueRef(req.arena, value);
|
auto v = ValueRef(req.arena, value);
|
||||||
t.mutations.emplace_back(req.arena, MutationRef::SetValue, r.begin, v);
|
t.mutations.emplace_back(req.arena, MutationRef::SetValue, r.begin, v);
|
||||||
|
trState->totalCost += getWriteOperationCost(key.expectedSize() + value.expectedSize());
|
||||||
|
|
||||||
if (addConflictRange) {
|
if (addConflictRange) {
|
||||||
t.write_conflict_ranges.push_back(req.arena, r);
|
t.write_conflict_ranges.push_back(req.arena, r);
|
||||||
|
@ -5796,6 +5800,7 @@ void Transaction::atomicOp(const KeyRef& key,
|
||||||
auto v = ValueRef(req.arena, operand);
|
auto v = ValueRef(req.arena, operand);
|
||||||
|
|
||||||
t.mutations.emplace_back(req.arena, operationType, r.begin, v);
|
t.mutations.emplace_back(req.arena, operationType, r.begin, v);
|
||||||
|
trState->totalCost += getWriteOperationCost(key.expectedSize());
|
||||||
|
|
||||||
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
|
if (addConflictRange && operationType != MutationRef::SetVersionstampedKey)
|
||||||
t.write_conflict_ranges.push_back(req.arena, r);
|
t.write_conflict_ranges.push_back(req.arena, r);
|
||||||
|
@ -5827,7 +5832,10 @@ void Transaction::clear(const KeyRangeRef& range, AddConflictRange addConflictRa
|
||||||
return;
|
return;
|
||||||
|
|
||||||
t.mutations.emplace_back(req.arena, MutationRef::ClearRange, r.begin, r.end);
|
t.mutations.emplace_back(req.arena, MutationRef::ClearRange, r.begin, r.end);
|
||||||
|
// NOTE: The throttling cost of each clear is assumed to be one page.
|
||||||
|
// This makes compuation fast, but can be inaccurate and may
|
||||||
|
// underestimate the cost of large clears.
|
||||||
|
trState->totalCost += CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||||
if (addConflictRange)
|
if (addConflictRange)
|
||||||
t.write_conflict_ranges.push_back(req.arena, r);
|
t.write_conflict_ranges.push_back(req.arena, r);
|
||||||
}
|
}
|
||||||
|
@ -6240,14 +6248,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
|
||||||
state int i = 0;
|
state int i = 0;
|
||||||
|
|
||||||
for (; i < transaction->mutations.size(); ++i) {
|
for (; i < transaction->mutations.size(); ++i) {
|
||||||
auto* it = &transaction->mutations[i];
|
auto const& mutation = transaction->mutations[i];
|
||||||
|
|
||||||
if (it->type == MutationRef::Type::SetValue || it->isAtomicOp()) {
|
if (mutation.type == MutationRef::Type::SetValue || mutation.isAtomicOp()) {
|
||||||
trCommitCosts.opsCount++;
|
trCommitCosts.opsCount++;
|
||||||
trCommitCosts.writeCosts += getWriteOperationCost(it->expectedSize());
|
trCommitCosts.writeCosts += getWriteOperationCost(mutation.expectedSize());
|
||||||
} else if (it->type == MutationRef::Type::ClearRange) {
|
} else if (mutation.type == MutationRef::Type::ClearRange) {
|
||||||
trCommitCosts.opsCount++;
|
trCommitCosts.opsCount++;
|
||||||
keyRange = KeyRangeRef(it->param1, it->param2);
|
keyRange = KeyRangeRef(mutation.param1, mutation.param2);
|
||||||
if (trState->options.expensiveClearCostEstimation) {
|
if (trState->options.expensiveClearCostEstimation) {
|
||||||
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY, trState));
|
StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY, trState));
|
||||||
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
|
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
|
||||||
|
|
|
@ -564,6 +564,10 @@ Version PaxosConfigTransaction::getCommittedVersion() const {
|
||||||
return impl->getCommittedVersion();
|
return impl->getCommittedVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t PaxosConfigTransaction::getTotalCost() const {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t PaxosConfigTransaction::getApproximateSize() const {
|
int64_t PaxosConfigTransaction::getApproximateSize() const {
|
||||||
return impl->getApproximateSize();
|
return impl->getApproximateSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,6 +383,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( ROCKSDB_WRITER_THREAD_PRIORITY, 0 );
|
init( ROCKSDB_WRITER_THREAD_PRIORITY, 0 );
|
||||||
init( ROCKSDB_BACKGROUND_PARALLELISM, 4 );
|
init( ROCKSDB_BACKGROUND_PARALLELISM, 4 );
|
||||||
init( ROCKSDB_READ_PARALLELISM, 4 );
|
init( ROCKSDB_READ_PARALLELISM, 4 );
|
||||||
|
// If true, do not process and store RocksDB logs
|
||||||
|
init( ROCKSDB_MUTE_LOGS, false );
|
||||||
// Use a smaller memtable in simulation to avoid OOMs.
|
// Use a smaller memtable in simulation to avoid OOMs.
|
||||||
int64_t memtableBytes = isSimulated ? 32 * 1024 : 512 * 1024 * 1024;
|
int64_t memtableBytes = isSimulated ? 32 * 1024 : 512 * 1024 * 1024;
|
||||||
init( ROCKSDB_MEMTABLE_BYTES, memtableBytes );
|
init( ROCKSDB_MEMTABLE_BYTES, memtableBytes );
|
||||||
|
@ -732,7 +734,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
||||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||||
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
|
||||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||||
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
|
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
|
||||||
|
@ -945,9 +946,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( ENCRYPTION_MODE, "AES-256-CTR" );
|
init( ENCRYPTION_MODE, "AES-256-CTR" );
|
||||||
init( SIM_KMS_MAX_KEYS, 4096 );
|
init( SIM_KMS_MAX_KEYS, 4096 );
|
||||||
init( ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH, 100000 );
|
init( ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH, 100000 );
|
||||||
init( ENABLE_TLOG_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION && !PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ) ENABLE_TLOG_ENCRYPTION = true;
|
init( ENABLE_TLOG_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION ) ENABLE_TLOG_ENCRYPTION = false;
|
||||||
init( ENABLE_STORAGE_SERVER_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY) ENABLE_STORAGE_SERVER_ENCRYPTION = !ENABLE_STORAGE_SERVER_ENCRYPTION;
|
init( ENABLE_STORAGE_SERVER_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION) ENABLE_STORAGE_SERVER_ENCRYPTION = false;
|
||||||
init( ENABLE_BLOB_GRANULE_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY) ENABLE_BLOB_GRANULE_ENCRYPTION = !ENABLE_BLOB_GRANULE_ENCRYPTION;
|
init( ENABLE_BLOB_GRANULE_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION) ENABLE_BLOB_GRANULE_ENCRYPTION = false;
|
||||||
|
|
||||||
// encrypt key proxy
|
// encrypt key proxy
|
||||||
init( ENABLE_BLOB_GRANULE_COMPRESSION, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_COMPRESSION = deterministicRandom()->coinflip(); }
|
init( ENABLE_BLOB_GRANULE_COMPRESSION, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_COMPRESSION = deterministicRandom()->coinflip(); }
|
||||||
|
|
|
@ -296,6 +296,10 @@ Version SimpleConfigTransaction::getCommittedVersion() const {
|
||||||
return impl->getCommittedVersion();
|
return impl->getCommittedVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t SimpleConfigTransaction::getTotalCost() const {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t SimpleConfigTransaction::getApproximateSize() const {
|
int64_t SimpleConfigTransaction::getApproximateSize() const {
|
||||||
return impl->getApproximateSize();
|
return impl->getApproximateSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -626,6 +626,14 @@ ThreadFuture<SpanContext> ThreadSafeTransaction::getSpanContext() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ThreadFuture<int64_t> ThreadSafeTransaction::getTotalCost() {
|
||||||
|
ISingleThreadTransaction* tr = this->tr;
|
||||||
|
return onMainThread([tr]() -> Future<int64_t> {
|
||||||
|
tr->checkDeferredError();
|
||||||
|
return tr->getTotalCost();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
|
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
|
||||||
ISingleThreadTransaction* tr = this->tr;
|
ISingleThreadTransaction* tr = this->tr;
|
||||||
return onMainThread([tr]() -> Future<int64_t> {
|
return onMainThread([tr]() -> Future<int64_t> {
|
||||||
|
|
|
@ -262,6 +262,8 @@ public:
|
||||||
double TAG_THROTTLE_EXPIRATION_INTERVAL;
|
double TAG_THROTTLE_EXPIRATION_INTERVAL;
|
||||||
int64_t WRITE_COST_BYTE_FACTOR; // Used to round up the cost of write operations
|
int64_t WRITE_COST_BYTE_FACTOR; // Used to round up the cost of write operations
|
||||||
int64_t READ_COST_BYTE_FACTOR; // Used to round up the cost of read operations
|
int64_t READ_COST_BYTE_FACTOR; // Used to round up the cost of read operations
|
||||||
|
// Cost multiplier for writes (because write operations are more expensive than reads):
|
||||||
|
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
||||||
|
|
||||||
// busyness reporting
|
// busyness reporting
|
||||||
double BUSYNESS_SPIKE_START_THRESHOLD;
|
double BUSYNESS_SPIKE_START_THRESHOLD;
|
||||||
|
|
|
@ -120,6 +120,7 @@ public:
|
||||||
// later if they are not really needed.
|
// later if they are not really needed.
|
||||||
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
|
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
|
||||||
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
|
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
|
||||||
|
virtual ThreadFuture<int64_t> getTotalCost() = 0;
|
||||||
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
|
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
|
||||||
|
|
||||||
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||||
|
|
|
@ -101,6 +101,7 @@ public:
|
||||||
virtual Version getCommittedVersion() const = 0;
|
virtual Version getCommittedVersion() const = 0;
|
||||||
virtual VersionVector getVersionVector() const = 0;
|
virtual VersionVector getVersionVector() const = 0;
|
||||||
virtual SpanContext getSpanContext() const = 0;
|
virtual SpanContext getSpanContext() const = 0;
|
||||||
|
virtual int64_t getTotalCost() const = 0;
|
||||||
virtual int64_t getApproximateSize() const = 0;
|
virtual int64_t getApproximateSize() const = 0;
|
||||||
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
|
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
|
||||||
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||||
|
|
|
@ -377,6 +377,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
||||||
|
|
||||||
FDBFuture* (*transactionCommit)(FDBTransaction* tr);
|
FDBFuture* (*transactionCommit)(FDBTransaction* tr);
|
||||||
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction* tr, int64_t* outVersion);
|
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction* tr, int64_t* outVersion);
|
||||||
|
FDBFuture* (*transactionGetTotalCost)(FDBTransaction* tr);
|
||||||
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction* tr);
|
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction* tr);
|
||||||
FDBFuture* (*transactionWatch)(FDBTransaction* tr, uint8_t const* keyName, int keyNameLength);
|
FDBFuture* (*transactionWatch)(FDBTransaction* tr, uint8_t const* keyName, int keyNameLength);
|
||||||
FDBFuture* (*transactionOnError)(FDBTransaction* tr, fdb_error_t error);
|
FDBFuture* (*transactionOnError)(FDBTransaction* tr, fdb_error_t error);
|
||||||
|
@ -505,6 +506,7 @@ public:
|
||||||
Version getCommittedVersion() override;
|
Version getCommittedVersion() override;
|
||||||
ThreadFuture<VersionVector> getVersionVector() override;
|
ThreadFuture<VersionVector> getVersionVector() override;
|
||||||
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
|
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
|
||||||
|
ThreadFuture<int64_t> getTotalCost() override;
|
||||||
ThreadFuture<int64_t> getApproximateSize() override;
|
ThreadFuture<int64_t> getApproximateSize() override;
|
||||||
|
|
||||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||||
|
@ -732,6 +734,7 @@ public:
|
||||||
Version getCommittedVersion() override;
|
Version getCommittedVersion() override;
|
||||||
ThreadFuture<VersionVector> getVersionVector() override;
|
ThreadFuture<VersionVector> getVersionVector() override;
|
||||||
ThreadFuture<SpanContext> getSpanContext() override;
|
ThreadFuture<SpanContext> getSpanContext() override;
|
||||||
|
ThreadFuture<int64_t> getTotalCost() override;
|
||||||
ThreadFuture<int64_t> getApproximateSize() override;
|
ThreadFuture<int64_t> getApproximateSize() override;
|
||||||
|
|
||||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||||
|
|
|
@ -249,6 +249,9 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
||||||
SpanContext spanContext;
|
SpanContext spanContext;
|
||||||
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
|
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
|
||||||
bool readVersionObtainedFromGrvProxy;
|
bool readVersionObtainedFromGrvProxy;
|
||||||
|
// Measured by summing the bytes accessed by each read and write operation
|
||||||
|
// after rounding up to the nearest page size and applying a write penalty
|
||||||
|
int64_t totalCost = 0;
|
||||||
|
|
||||||
// Special flag to skip prepending tenant prefix to mutations and conflict ranges
|
// Special flag to skip prepending tenant prefix to mutations and conflict ranges
|
||||||
// when a dummy, internal transaction gets commited. The sole purpose of commitDummyTransaction() is to
|
// when a dummy, internal transaction gets commited. The sole purpose of commitDummyTransaction() is to
|
||||||
|
@ -447,6 +450,8 @@ public:
|
||||||
// May be called only after commit() returns success
|
// May be called only after commit() returns success
|
||||||
Version getCommittedVersion() const { return trState->committedVersion; }
|
Version getCommittedVersion() const { return trState->committedVersion; }
|
||||||
|
|
||||||
|
int64_t getTotalCost() const { return trState->totalCost; }
|
||||||
|
|
||||||
// Will be fulfilled only after commit() returns success
|
// Will be fulfilled only after commit() returns success
|
||||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();
|
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();
|
||||||
|
|
||||||
|
@ -566,9 +571,16 @@ ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
|
||||||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||||
ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion> exclusions);
|
ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion> exclusions);
|
||||||
|
|
||||||
// Round up to the nearest page size
|
// Measured in bytes, rounded up to the nearest page size. Multiply by fungibility ratio
|
||||||
|
// because writes are more expensive than reads.
|
||||||
inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
||||||
return (bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1;
|
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR *
|
||||||
|
((bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Measured in bytes, rounded up to the nearest page size.
|
||||||
|
inline uint64_t getReadOperationCost(uint64_t bytes) {
|
||||||
|
return ((bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
|
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
|
||||||
|
|
|
@ -64,6 +64,7 @@ public:
|
||||||
void clear(KeyRef const&) override;
|
void clear(KeyRef const&) override;
|
||||||
Future<Void> commit() override;
|
Future<Void> commit() override;
|
||||||
Version getCommittedVersion() const override;
|
Version getCommittedVersion() const override;
|
||||||
|
int64_t getTotalCost() const override;
|
||||||
int64_t getApproximateSize() const override;
|
int64_t getApproximateSize() const override;
|
||||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||||
Future<Void> onError(Error const& e) override;
|
Future<Void> onError(Error const& e) override;
|
||||||
|
|
|
@ -149,6 +149,7 @@ public:
|
||||||
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
|
VersionVector getVersionVector() const override { return tr.getVersionVector(); }
|
||||||
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
|
SpanContext getSpanContext() const override { return tr.getSpanContext(); }
|
||||||
|
|
||||||
|
int64_t getTotalCost() const override { return tr.getTotalCost(); }
|
||||||
int64_t getApproximateSize() const override { return approximateSize; }
|
int64_t getApproximateSize() const override { return approximateSize; }
|
||||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;
|
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;
|
||||||
|
|
||||||
|
|
|
@ -316,6 +316,7 @@ public:
|
||||||
int64_t ROCKSDB_MEMTABLE_BYTES;
|
int64_t ROCKSDB_MEMTABLE_BYTES;
|
||||||
bool ROCKSDB_LEVEL_STYLE_COMPACTION;
|
bool ROCKSDB_LEVEL_STYLE_COMPACTION;
|
||||||
bool ROCKSDB_UNSAFE_AUTO_FSYNC;
|
bool ROCKSDB_UNSAFE_AUTO_FSYNC;
|
||||||
|
bool ROCKSDB_MUTE_LOGS;
|
||||||
int64_t ROCKSDB_PERIODIC_COMPACTION_SECONDS;
|
int64_t ROCKSDB_PERIODIC_COMPACTION_SECONDS;
|
||||||
int ROCKSDB_PREFIX_LEN;
|
int ROCKSDB_PREFIX_LEN;
|
||||||
int64_t ROCKSDB_BLOCK_CACHE_SIZE;
|
int64_t ROCKSDB_BLOCK_CACHE_SIZE;
|
||||||
|
@ -629,8 +630,6 @@ public:
|
||||||
double GLOBAL_TAG_THROTTLING_MIN_RATE;
|
double GLOBAL_TAG_THROTTLING_MIN_RATE;
|
||||||
// Used by global tag throttling counters
|
// Used by global tag throttling counters
|
||||||
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
||||||
// Cost multiplier for writes (because write operations are more expensive than reads)
|
|
||||||
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
|
||||||
// Maximum number of tags tracked by global tag throttler. Additional tags will be ignored
|
// Maximum number of tags tracked by global tag throttler. Additional tags will be ignored
|
||||||
// until some existing tags expire
|
// until some existing tags expire
|
||||||
int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED;
|
int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED;
|
||||||
|
@ -744,7 +743,6 @@ public:
|
||||||
int64_t MIN_TAG_READ_PAGES_RATE;
|
int64_t MIN_TAG_READ_PAGES_RATE;
|
||||||
int64_t MIN_TAG_WRITE_PAGES_RATE;
|
int64_t MIN_TAG_WRITE_PAGES_RATE;
|
||||||
double TAG_MEASUREMENT_INTERVAL;
|
double TAG_MEASUREMENT_INTERVAL;
|
||||||
int64_t READ_COST_BYTE_FACTOR;
|
|
||||||
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
|
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
|
||||||
bool REPORT_DD_METRICS;
|
bool REPORT_DD_METRICS;
|
||||||
double DD_METRICS_REPORT_INTERVAL;
|
double DD_METRICS_REPORT_INTERVAL;
|
||||||
|
|
|
@ -76,6 +76,7 @@ public:
|
||||||
void reset() override;
|
void reset() override;
|
||||||
void debugTransaction(UID dID) override;
|
void debugTransaction(UID dID) override;
|
||||||
void checkDeferredError() const override;
|
void checkDeferredError() const override;
|
||||||
|
int64_t getTotalCost() const override;
|
||||||
int64_t getApproximateSize() const override;
|
int64_t getApproximateSize() const override;
|
||||||
void set(KeyRef const&, ValueRef const&) override;
|
void set(KeyRef const&, ValueRef const&) override;
|
||||||
void clear(KeyRangeRef const&) override { throw client_invalid_operation(); }
|
void clear(KeyRangeRef const&) override { throw client_invalid_operation(); }
|
||||||
|
|
|
@ -205,6 +205,7 @@ public:
|
||||||
Version getCommittedVersion() override;
|
Version getCommittedVersion() override;
|
||||||
ThreadFuture<VersionVector> getVersionVector() override;
|
ThreadFuture<VersionVector> getVersionVector() override;
|
||||||
ThreadFuture<SpanContext> getSpanContext() override;
|
ThreadFuture<SpanContext> getSpanContext() override;
|
||||||
|
ThreadFuture<int64_t> getTotalCost() override;
|
||||||
ThreadFuture<int64_t> getApproximateSize() override;
|
ThreadFuture<int64_t> getApproximateSize() override;
|
||||||
|
|
||||||
ThreadFuture<uint64_t> getProtocolVersion();
|
ThreadFuture<uint64_t> getProtocolVersion();
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include "fdbserver/ApplyMetadataMutation.h"
|
#include "fdbserver/ApplyMetadataMutation.h"
|
||||||
#include "fdbserver/EncryptionOpsUtils.h"
|
#include "fdbserver/EncryptionOpsUtils.h"
|
||||||
#include "fdbserver/IKeyValueStore.h"
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
|
#include "fdbserver/Knobs.h"
|
||||||
#include "fdbserver/LogProtocolMessage.h"
|
#include "fdbserver/LogProtocolMessage.h"
|
||||||
#include "fdbserver/LogSystem.h"
|
#include "fdbserver/LogSystem.h"
|
||||||
#include "flow/Error.h"
|
#include "flow/Error.h"
|
||||||
|
@ -87,9 +88,10 @@ public:
|
||||||
|
|
||||||
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
||||||
ResolverData& resolverData_,
|
ResolverData& resolverData_,
|
||||||
const VectorRef<MutationRef>& mutations_)
|
const VectorRef<MutationRef>& mutations_,
|
||||||
|
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys_)
|
||||||
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
|
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
|
||||||
txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
|
cipherKeys(cipherKeys_), txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
|
||||||
confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem), popVersion(resolverData_.popVersion),
|
confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem), popVersion(resolverData_.popVersion),
|
||||||
keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
|
keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
|
||||||
initialCommit(resolverData_.initialCommit), forResolver(true) {}
|
initialCommit(resolverData_.initialCommit), forResolver(true) {}
|
||||||
|
@ -160,11 +162,13 @@ private:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void writeMutation(const MutationRef& m) {
|
void writeMutation(const MutationRef& m) {
|
||||||
if (forResolver || !isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
if (!isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||||
toCommit->writeTypedMessage(m);
|
toCommit->writeTypedMessage(m);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(cipherKeys != nullptr);
|
ASSERT(cipherKeys != nullptr);
|
||||||
Arena arena;
|
Arena arena;
|
||||||
|
CODE_PROBE(!forResolver, "encrypting metadata mutations");
|
||||||
|
CODE_PROBE(forResolver, "encrypting resolver mutations");
|
||||||
toCommit->writeTypedMessage(m.encryptMetadata(*cipherKeys, arena, BlobCipherMetrics::TLOG));
|
toCommit->writeTypedMessage(m.encryptMetadata(*cipherKeys, arena, BlobCipherMetrics::TLOG));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1343,8 +1347,9 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
||||||
|
|
||||||
void applyMetadataMutations(SpanContext const& spanContext,
|
void applyMetadataMutations(SpanContext const& spanContext,
|
||||||
ResolverData& resolverData,
|
ResolverData& resolverData,
|
||||||
const VectorRef<MutationRef>& mutations) {
|
const VectorRef<MutationRef>& mutations,
|
||||||
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply();
|
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys) {
|
||||||
|
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations, pCipherKeys).apply();
|
||||||
}
|
}
|
||||||
|
|
||||||
void applyMetadataMutations(SpanContext const& spanContext,
|
void applyMetadataMutations(SpanContext const& spanContext,
|
||||||
|
|
|
@ -2776,6 +2776,7 @@ ACTOR Future<Void> haltBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
|
||||||
if (bmData->iAmReplaced.canBeSet()) {
|
if (bmData->iAmReplaced.canBeSet()) {
|
||||||
bmData->iAmReplaced.send(Void());
|
bmData->iAmReplaced.send(Void());
|
||||||
}
|
}
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2896,6 +2897,7 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
|
||||||
if (bmData->iAmReplaced.canBeSet()) {
|
if (bmData->iAmReplaced.canBeSet()) {
|
||||||
bmData->iAmReplaced.send(Void());
|
bmData->iAmReplaced.send(Void());
|
||||||
}
|
}
|
||||||
|
throw blob_manager_replaced();
|
||||||
}
|
}
|
||||||
|
|
||||||
BoundaryEvaluation newEval(rep.continueEpoch,
|
BoundaryEvaluation newEval(rep.continueEpoch,
|
||||||
|
@ -5299,6 +5301,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
||||||
fmt::print("BM {} exiting because it is replaced\n", self->epoch);
|
fmt::print("BM {} exiting because it is replaced\n", self->epoch);
|
||||||
}
|
}
|
||||||
TraceEvent("BlobManagerReplaced", bmInterf.id()).detail("Epoch", epoch);
|
TraceEvent("BlobManagerReplaced", bmInterf.id()).detail("Epoch", epoch);
|
||||||
|
wait(delay(0.0));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
when(HaltBlobManagerRequest req = waitNext(bmInterf.haltBlobManager.getFuture())) {
|
when(HaltBlobManagerRequest req = waitNext(bmInterf.haltBlobManager.getFuture())) {
|
||||||
|
|
|
@ -1266,8 +1266,14 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
|
||||||
if (self->pProxyCommitData->isEncryptionEnabled) {
|
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||||
state EncryptCipherDomainId domainId = tenantId;
|
state EncryptCipherDomainId domainId = tenantId;
|
||||||
state MutationRef encryptedMutation;
|
state MutationRef encryptedMutation;
|
||||||
|
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::DISABLED,
|
||||||
|
"using disabled tenant mode");
|
||||||
|
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::OPTIONAL_TENANT,
|
||||||
|
"using optional tenant mode");
|
||||||
|
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::REQUIRED,
|
||||||
|
"using required tenant mode");
|
||||||
|
|
||||||
if (encryptedMutationOpt->present()) {
|
if (encryptedMutationOpt && encryptedMutationOpt->present()) {
|
||||||
CODE_PROBE(true, "using already encrypted mutation");
|
CODE_PROBE(true, "using already encrypted mutation");
|
||||||
encryptedMutation = encryptedMutationOpt->get();
|
encryptedMutation = encryptedMutationOpt->get();
|
||||||
ASSERT(encryptedMutation.isEncrypted());
|
ASSERT(encryptedMutation.isEncrypted());
|
||||||
|
@ -1299,6 +1305,8 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
|
||||||
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
|
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
|
||||||
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
|
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
|
||||||
}
|
}
|
||||||
|
ASSERT(encryptedMutation.isEncrypted());
|
||||||
|
CODE_PROBE(true, "encrypting non-metadata mutations");
|
||||||
self->toCommit.writeTypedMessage(encryptedMutation);
|
self->toCommit.writeTypedMessage(encryptedMutation);
|
||||||
return encryptedMutation;
|
return encryptedMutation;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1473,12 +1481,12 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||||
if (!hasCandidateBackupKeys) {
|
if (!hasCandidateBackupKeys) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m.type != MutationRef::Type::ClearRange) {
|
if (m.type != MutationRef::Type::ClearRange) {
|
||||||
// Add the mutation to the relevant backup tag
|
// Add the mutation to the relevant backup tag
|
||||||
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
|
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
|
||||||
// If encryption is enabled make sure the mutation we are writing is also encrypted
|
// If encryption is enabled make sure the mutation we are writing is also encrypted
|
||||||
ASSERT(!self->pProxyCommitData->isEncryptionEnabled || writtenMutation.isEncrypted());
|
ASSERT(!self->pProxyCommitData->isEncryptionEnabled || writtenMutation.isEncrypted());
|
||||||
|
CODE_PROBE(writtenMutation.isEncrypted(), "using encrypted backup mutation");
|
||||||
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, writtenMutation);
|
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, writtenMutation);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1500,6 +1508,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||||
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
|
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
|
||||||
// changed to account for clear ranges which span tenant boundaries
|
// changed to account for clear ranges which span tenant boundaries
|
||||||
if (self->pProxyCommitData->isEncryptionEnabled) {
|
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||||
|
CODE_PROBE(true, "encrypting clear range backup mutation");
|
||||||
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
|
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
|
||||||
encryptedMutation.present()) {
|
encryptedMutation.present()) {
|
||||||
backupMutation = encryptedMutation.get();
|
backupMutation = encryptedMutation.get();
|
||||||
|
@ -1510,6 +1519,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||||
backupMutation =
|
backupMutation =
|
||||||
backupMutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::BACKUP);
|
backupMutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::BACKUP);
|
||||||
}
|
}
|
||||||
|
ASSERT(backupMutation.isEncrypted());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the mutation to the relevant backup tag
|
// Add the mutation to the relevant backup tag
|
||||||
|
@ -1613,14 +1623,25 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
||||||
idempotencyIdSet.param2 = kv.value;
|
idempotencyIdSet.param2 = kv.value;
|
||||||
auto& tags = pProxyCommitData->tagsForKey(kv.key);
|
auto& tags = pProxyCommitData->tagsForKey(kv.key);
|
||||||
self->toCommit.addTags(tags);
|
self->toCommit.addTags(tags);
|
||||||
self->toCommit.writeTypedMessage(idempotencyIdSet);
|
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||||
|
CODE_PROBE(true, "encrypting idempotency mutation");
|
||||||
|
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
|
||||||
|
getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet);
|
||||||
|
Arena arena;
|
||||||
|
MutationRef encryptedMutation = idempotencyIdSet.encrypt(
|
||||||
|
self->cipherKeys, p.second, arena, BlobCipherMetrics::TLOG);
|
||||||
|
self->toCommit.writeTypedMessage(encryptedMutation);
|
||||||
|
} else {
|
||||||
|
self->toCommit.writeTypedMessage(idempotencyIdSet);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
state int i = 0;
|
||||||
for (const auto& m : pProxyCommitData->idempotencyClears) {
|
for (i = 0; i < pProxyCommitData->idempotencyClears.size(); i++) {
|
||||||
|
MutationRef& m = pProxyCommitData->idempotencyClears[i];
|
||||||
auto& tags = pProxyCommitData->tagsForKey(m.param1);
|
auto& tags = pProxyCommitData->tagsForKey(m.param1);
|
||||||
self->toCommit.addTags(tags);
|
self->toCommit.addTags(tags);
|
||||||
// TODO(nwijetunga): Encrypt these mutations
|
Arena arena;
|
||||||
self->toCommit.writeTypedMessage(m);
|
wait(success(writeMutation(self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &m, nullptr, &arena)));
|
||||||
}
|
}
|
||||||
pProxyCommitData->idempotencyClears = Standalone<VectorRef<MutationRef>>();
|
pProxyCommitData->idempotencyClears = Standalone<VectorRef<MutationRef>>();
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ struct StringBuffer {
|
||||||
StringBuffer(UID fromFileID) : reserved(0), id(fromFileID) {}
|
StringBuffer(UID fromFileID) : reserved(0), id(fromFileID) {}
|
||||||
|
|
||||||
int size() const { return str.size(); }
|
int size() const { return str.size(); }
|
||||||
StringRef& ref() { return str; }
|
Standalone<StringRef> get() { return str; }
|
||||||
void clear() {
|
void clear() {
|
||||||
str = Standalone<StringRef>();
|
str = Standalone<StringRef>();
|
||||||
reserved = 0;
|
reserved = 0;
|
||||||
|
@ -63,19 +63,19 @@ struct StringBuffer {
|
||||||
void clearReserve(int size) {
|
void clearReserve(int size) {
|
||||||
str = Standalone<StringRef>();
|
str = Standalone<StringRef>();
|
||||||
reserved = size;
|
reserved = size;
|
||||||
ref() = StringRef(new (str.arena()) uint8_t[size], 0);
|
str.contents() = StringRef(new (str.arena()) uint8_t[size], 0);
|
||||||
}
|
}
|
||||||
void append(StringRef x) { memcpy(append(x.size()), x.begin(), x.size()); }
|
void append(StringRef x) { memcpy(append(x.size()), x.begin(), x.size()); }
|
||||||
void* append(int bytes) {
|
void* append(int bytes) {
|
||||||
ASSERT(str.size() + bytes <= reserved);
|
ASSERT(str.size() + bytes <= reserved);
|
||||||
void* p = const_cast<uint8_t*>(str.end());
|
void* p = const_cast<uint8_t*>(str.end());
|
||||||
ref() = StringRef(str.begin(), str.size() + bytes);
|
str.contents() = StringRef(str.begin(), str.size() + bytes);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
StringRef pop_front(int bytes) {
|
StringRef pop_front(int bytes) {
|
||||||
ASSERT(bytes <= str.size());
|
ASSERT(bytes <= str.size());
|
||||||
StringRef result = str.substr(0, bytes);
|
StringRef result = str.substr(0, bytes);
|
||||||
ref() = str.substr(bytes);
|
str.contents() = str.substr(bytes);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
void alignReserve(int alignment, int size) {
|
void alignReserve(int alignment, int size) {
|
||||||
|
@ -101,7 +101,7 @@ struct StringBuffer {
|
||||||
if (str.size() > 0) {
|
if (str.size() > 0) {
|
||||||
memcpy(p, str.begin(), str.size());
|
memcpy(p, str.begin(), str.size());
|
||||||
}
|
}
|
||||||
ref() = StringRef(p, str.size());
|
str.contents() = StringRef(p, str.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -196,7 +196,7 @@ public:
|
||||||
stallCount.init("RawDiskQueue.StallCount"_sr);
|
stallCount.init("RawDiskQueue.StallCount"_sr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> pushAndCommit(StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
|
Future<Void> pushAndCommit(Standalone<StringRef> pageData, StringBuffer* pageMem, uint64_t poppedPages) {
|
||||||
return pushAndCommit(this, pageData, pageMem, poppedPages);
|
return pushAndCommit(this, pageData, pageMem, poppedPages);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,13 +332,13 @@ public:
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
Future<Future<Void>> push(StringRef pageData, std::vector<Reference<SyncQueue>>* toSync) {
|
Future<Future<Void>> push(Standalone<StringRef> pageData, std::vector<Reference<SyncQueue>>* toSync) {
|
||||||
return push(this, pageData, toSync);
|
return push(this, pageData, toSync);
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Future<Void>> push(RawDiskQueue_TwoFiles* self,
|
ACTOR static UNCANCELLABLE Future<Future<Void>> push(RawDiskQueue_TwoFiles* self,
|
||||||
StringRef pageData,
|
Standalone<StringRef> pageData,
|
||||||
std::vector<Reference<SyncQueue>>* toSync) {
|
std::vector<Reference<SyncQueue>>* toSync) {
|
||||||
// Write the given data (pageData) to the queue files, swapping or extending them if necessary.
|
// Write the given data (pageData) to the queue files, swapping or extending them if necessary.
|
||||||
// Don't do any syncs, but push the modified file(s) onto toSync.
|
// Don't do any syncs, but push the modified file(s) onto toSync.
|
||||||
ASSERT(self->readingFile == 2);
|
ASSERT(self->readingFile == 2);
|
||||||
|
@ -357,8 +357,9 @@ public:
|
||||||
toSync->push_back(self->files[1].syncQueue);
|
toSync->push_back(self->files[1].syncQueue);
|
||||||
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
|
||||||
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
|
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
|
||||||
waitfor.push_back(self->files[1].f->write(pageData.begin(), p, self->writingPos));
|
waitfor.push_back(uncancellable(
|
||||||
pageData = pageData.substr(p);
|
holdWhile(pageData, self->files[1].f->write(pageData.begin(), p, self->writingPos))));
|
||||||
|
pageData.contents() = pageData.substr(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
self->dbg_file0BeginSeq += self->files[0].size;
|
self->dbg_file0BeginSeq += self->files[0].size;
|
||||||
|
@ -426,7 +427,8 @@ public:
|
||||||
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
|
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
|
||||||
self->files[1].size = std::max(self->files[1].size, self->writingPos + pageData.size());
|
self->files[1].size = std::max(self->files[1].size, self->writingPos + pageData.size());
|
||||||
toSync->push_back(self->files[1].syncQueue);
|
toSync->push_back(self->files[1].syncQueue);
|
||||||
waitfor.push_back(self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos));
|
waitfor.push_back(uncancellable(
|
||||||
|
holdWhile(pageData, self->files[1].f->write(pageData.begin(), pageData.size(), self->writingPos))));
|
||||||
self->writingPos += pageData.size();
|
self->writingPos += pageData.size();
|
||||||
|
|
||||||
return waitForAllReadyThenThrow(waitfor);
|
return waitForAllReadyThenThrow(waitfor);
|
||||||
|
@ -435,7 +437,7 @@ public:
|
||||||
// Write the given data (pageData) to the queue files of self, sync data to disk, and delete the memory (pageMem)
|
// Write the given data (pageData) to the queue files of self, sync data to disk, and delete the memory (pageMem)
|
||||||
// that hold the pageData
|
// that hold the pageData
|
||||||
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self,
|
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self,
|
||||||
StringRef pageData,
|
Standalone<StringRef> pageData,
|
||||||
StringBuffer* pageMem,
|
StringBuffer* pageMem,
|
||||||
uint64_t poppedPages) {
|
uint64_t poppedPages) {
|
||||||
state Promise<Void> pushing, committed;
|
state Promise<Void> pushing, committed;
|
||||||
|
@ -983,7 +985,7 @@ public:
|
||||||
|
|
||||||
lastCommittedSeq = backPage().endSeq();
|
lastCommittedSeq = backPage().endSeq();
|
||||||
auto f = rawQueue->pushAndCommit(
|
auto f = rawQueue->pushAndCommit(
|
||||||
pushed_page_buffer->ref(), pushed_page_buffer, poppedSeq / sizeof(Page) - lastPoppedSeq / sizeof(Page));
|
pushed_page_buffer->get(), pushed_page_buffer, poppedSeq / sizeof(Page) - lastPoppedSeq / sizeof(Page));
|
||||||
lastPoppedSeq = poppedSeq;
|
lastPoppedSeq = poppedSeq;
|
||||||
pushed_page_buffer = 0;
|
pushed_page_buffer = 0;
|
||||||
return f;
|
return f;
|
||||||
|
@ -1179,7 +1181,7 @@ private:
|
||||||
Standalone<StringRef> pagedData = wait(readPages(self, start, end));
|
Standalone<StringRef> pagedData = wait(readPages(self, start, end));
|
||||||
const int startOffset = start % _PAGE_SIZE;
|
const int startOffset = start % _PAGE_SIZE;
|
||||||
const int dataLen = end - start;
|
const int dataLen = end - start;
|
||||||
ASSERT(pagedData.substr(startOffset, dataLen).compare(buffer->ref().substr(0, dataLen)) == 0);
|
ASSERT(pagedData.substr(startOffset, dataLen).compare(buffer->get().substr(0, dataLen)) == 0);
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
if (e.code() != error_code_io_error) {
|
if (e.code() != error_code_io_error) {
|
||||||
delete buffer;
|
delete buffer;
|
||||||
|
@ -1546,9 +1548,9 @@ private:
|
||||||
StringBuffer* pushed_page_buffer;
|
StringBuffer* pushed_page_buffer;
|
||||||
Page& backPage() {
|
Page& backPage() {
|
||||||
ASSERT(pushedPageCount());
|
ASSERT(pushedPageCount());
|
||||||
return ((Page*)pushed_page_buffer->ref().end())[-1];
|
return ((Page*)pushed_page_buffer->get().end())[-1];
|
||||||
}
|
}
|
||||||
Page const& backPage() const { return ((Page*)pushed_page_buffer->ref().end())[-1]; }
|
Page const& backPage() const { return ((Page*)pushed_page_buffer->get().end())[-1]; }
|
||||||
int pushedPageCount() const { return pushed_page_buffer ? pushed_page_buffer->size() / sizeof(Page) : 0; }
|
int pushedPageCount() const { return pushed_page_buffer ? pushed_page_buffer->size() / sizeof(Page) : 0; }
|
||||||
|
|
||||||
// Recovery state
|
// Recovery state
|
||||||
|
|
|
@ -107,7 +107,7 @@ class GlobalTagThrottlerImpl {
|
||||||
if (opType == OpType::READ) {
|
if (opType == OpType::READ) {
|
||||||
readCost.setTotal(newCost);
|
readCost.setTotal(newCost);
|
||||||
} else {
|
} else {
|
||||||
writeCost.setTotal(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * newCost);
|
writeCost.setTotal(CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * newCost);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -850,7 +850,7 @@ TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
|
||||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||||
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::WRITE);
|
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::WRITE);
|
||||||
state Future<Void> monitor = monitorActor(&globalTagThrottler, [testTag](auto& gtt) {
|
state Future<Void> monitor = monitorActor(&globalTagThrottler, [testTag](auto& gtt) {
|
||||||
return targetRateIsNear(gtt, testTag, 100.0 / (6.0 * SERVER_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO));
|
return targetRateIsNear(gtt, testTag, 100.0 / (6.0 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO));
|
||||||
});
|
});
|
||||||
|
|
||||||
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||||
|
|
|
@ -48,6 +48,7 @@
|
||||||
#endif
|
#endif
|
||||||
#include "fdbclient/SystemData.h"
|
#include "fdbclient/SystemData.h"
|
||||||
#include "fdbserver/CoroFlow.h"
|
#include "fdbserver/CoroFlow.h"
|
||||||
|
#include "fdbserver/RocksDBLogForwarder.h"
|
||||||
#include "flow/ActorCollection.h"
|
#include "flow/ActorCollection.h"
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "flow/IThreadPool.h"
|
#include "flow/IThreadPool.h"
|
||||||
|
@ -202,6 +203,13 @@ rocksdb::DBOptions SharedRocksDBState::initialDbOptions() {
|
||||||
options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers);
|
options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers);
|
||||||
|
|
||||||
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
|
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
|
||||||
|
|
||||||
|
if (SERVER_KNOBS->ROCKSDB_MUTE_LOGS) {
|
||||||
|
options.info_log = std::make_shared<NullRocksDBLogForwarder>();
|
||||||
|
} else {
|
||||||
|
options.info_log = std::make_shared<RocksDBLogForwarder>(id, options.info_log_level);
|
||||||
|
}
|
||||||
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,6 +205,17 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
||||||
req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();
|
req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();
|
||||||
state ProxyRequestsInfo& proxyInfo = self->proxyInfoMap[proxyAddress];
|
state ProxyRequestsInfo& proxyInfo = self->proxyInfoMap[proxyAddress];
|
||||||
|
|
||||||
|
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||||
|
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||||
|
static const std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> metadataDomains = {
|
||||||
|
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME },
|
||||||
|
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_ENCRYPT_HEADER_DOMAIN_NAME }
|
||||||
|
};
|
||||||
|
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks =
|
||||||
|
wait(getLatestEncryptCipherKeys(db, metadataDomains, BlobCipherMetrics::TLOG));
|
||||||
|
cipherKeys = cks;
|
||||||
|
}
|
||||||
|
|
||||||
++self->resolveBatchIn;
|
++self->resolveBatchIn;
|
||||||
|
|
||||||
if (req.debugID.present()) {
|
if (req.debugID.present()) {
|
||||||
|
@ -351,7 +362,11 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
||||||
SpanContext spanContext =
|
SpanContext spanContext =
|
||||||
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
|
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
|
||||||
|
|
||||||
applyMetadataMutations(spanContext, *resolverData, req.transactions[t].mutations);
|
applyMetadataMutations(spanContext,
|
||||||
|
*resolverData,
|
||||||
|
req.transactions[t].mutations,
|
||||||
|
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) ? &cipherKeys
|
||||||
|
: nullptr);
|
||||||
}
|
}
|
||||||
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery");
|
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery");
|
||||||
}
|
}
|
||||||
|
@ -506,8 +521,10 @@ struct TransactionStateResolveContext {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext,
|
ACTOR Future<Void> processCompleteTransactionStateRequest(
|
||||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
TransactionStateResolveContext* pContext,
|
||||||
|
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
|
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys) {
|
||||||
state KeyRange txnKeys = allKeys;
|
state KeyRange txnKeys = allKeys;
|
||||||
state std::map<Tag, UID> tag_uid;
|
state std::map<Tag, UID> tag_uid;
|
||||||
|
|
||||||
|
@ -574,7 +591,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
||||||
bool confChanges; // Ignore configuration changes for initial commits.
|
bool confChanges; // Ignore configuration changes for initial commits.
|
||||||
ResolverData resolverData(
|
ResolverData resolverData(
|
||||||
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
|
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
|
||||||
applyMetadataMutations(SpanContext(), resolverData, mutations);
|
applyMetadataMutations(SpanContext(), resolverData, mutations, cipherKeys);
|
||||||
} // loop
|
} // loop
|
||||||
|
|
||||||
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
||||||
|
@ -615,7 +632,18 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
|
||||||
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
||||||
// Received all components of the txnStateRequest
|
// Received all components of the txnStateRequest
|
||||||
ASSERT(!pContext->processed);
|
ASSERT(!pContext->processed);
|
||||||
wait(processCompleteTransactionStateRequest(pContext, db));
|
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||||
|
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||||
|
static const std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> metadataDomains = {
|
||||||
|
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME },
|
||||||
|
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_ENCRYPT_HEADER_DOMAIN_NAME }
|
||||||
|
};
|
||||||
|
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks =
|
||||||
|
wait(getLatestEncryptCipherKeys(db, metadataDomains, BlobCipherMetrics::TLOG));
|
||||||
|
cipherKeys = cks;
|
||||||
|
}
|
||||||
|
wait(processCompleteTransactionStateRequest(
|
||||||
|
pContext, db, isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) ? &cipherKeys : nullptr));
|
||||||
pContext->processed = true;
|
pContext->processed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/*
|
||||||
|
* RocksDBLogForwarder.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbserver/RocksDBLogForwarder.h"
|
||||||
|
|
||||||
|
#include "flow/network.h"
|
||||||
|
#include "flow/Trace.h"
|
||||||
|
#include "fdbrpc/simulator.h"
|
||||||
|
|
||||||
|
#include "flow/actorcompiler.h" // This must be the last include file
|
||||||
|
|
||||||
|
using InfoLogLevel = rocksdb::InfoLogLevel;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
Severity getSeverityFromLogLevel(const InfoLogLevel& log_level) {
|
||||||
|
switch (log_level) {
|
||||||
|
case InfoLogLevel::DEBUG_LEVEL:
|
||||||
|
return SevDebug;
|
||||||
|
case InfoLogLevel::INFO_LEVEL:
|
||||||
|
return SevInfo;
|
||||||
|
case InfoLogLevel::WARN_LEVEL:
|
||||||
|
return SevWarn;
|
||||||
|
case InfoLogLevel::ERROR_LEVEL:
|
||||||
|
return SevError;
|
||||||
|
case InfoLogLevel::FATAL_LEVEL:
|
||||||
|
return SevError;
|
||||||
|
case InfoLogLevel::HEADER_LEVEL:
|
||||||
|
return SevVerbose;
|
||||||
|
case InfoLogLevel::NUM_INFO_LOG_LEVELS:
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
UNREACHABLE();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace details {
|
||||||
|
|
||||||
|
void logTraceEvent(const RocksDBLogRecord& record) {
|
||||||
|
TraceEvent event = TraceEvent(record.severity, "RocksDBLogRecord", record.uid);
|
||||||
|
event.detail("RocksDBLogTime", record.logReceiveTime);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << record.threadID;
|
||||||
|
event.detail("RocksDBThreadID", ss.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& [k, v] : record.kvPairs) {
|
||||||
|
event.detail(k.c_str(), v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> rocksDBPeriodicallyLogger(RocksDBLogger* pRecords) {
|
||||||
|
loop choose {
|
||||||
|
when(wait(delay(0.1))) { pRecords->consume(); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RocksDBLogger::RocksDBLogger()
|
||||||
|
: mainThreadId(std::this_thread::get_id()), periodicLogger(rocksDBPeriodicallyLogger(this)) {}
|
||||||
|
|
||||||
|
void RocksDBLogger::inject(RocksDBLogRecord&& record) {
|
||||||
|
const std::thread::id threadId = std::this_thread::get_id();
|
||||||
|
if (threadId == mainThreadId) {
|
||||||
|
// In the main thread, it is *NOT* necessary to cache the record.
|
||||||
|
logTraceEvent(record);
|
||||||
|
|
||||||
|
consume();
|
||||||
|
} else {
|
||||||
|
const std::lock_guard<std::mutex> lockGuard(recordsMutex);
|
||||||
|
|
||||||
|
logRecords.emplace_back();
|
||||||
|
logRecords.back() = std::move(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RocksDBLogger::consume() {
|
||||||
|
std::vector<RocksDBLogRecord> currentRecords;
|
||||||
|
{
|
||||||
|
const std::lock_guard<std::mutex> lockGuard(recordsMutex);
|
||||||
|
currentRecords.swap(logRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& record : currentRecords) {
|
||||||
|
logTraceEvent(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace details
|
||||||
|
|
||||||
|
RocksDBLogForwarder::RocksDBLogForwarder(const UID& id_, const InfoLogLevel log_level)
|
||||||
|
: rocksdb::Logger(log_level), id(id_), logger() {
|
||||||
|
TraceEvent(SevInfo, "RocksDBLoggerStart", id);
|
||||||
|
}
|
||||||
|
|
||||||
|
RocksDBLogForwarder::~RocksDBLogForwarder() {
|
||||||
|
TraceEvent(SevInfo, "RocksDBLoggerStop", id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RocksDBLogForwarder::Logv(const char* format, va_list ap) {
|
||||||
|
Logv(InfoLogLevel::INFO_LEVEL, format, ap);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RocksDBLogForwarder::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
|
||||||
|
const std::thread::id threadID = std::this_thread::get_id();
|
||||||
|
|
||||||
|
// FIXME: Restrict the RocksDB log level to warn in order to prevent almost all simulation test failure. This has to
|
||||||
|
// be reconsidered.
|
||||||
|
const Severity severity = std::min(getSeverityFromLogLevel(log_level), SevWarn);
|
||||||
|
|
||||||
|
// TODO: Parse the log information into KV pairs
|
||||||
|
// At this stage vsnprintf is used
|
||||||
|
char buf[1024];
|
||||||
|
vsnprintf(buf, 1024, format, ap);
|
||||||
|
if (severity < SevError) {
|
||||||
|
logger.inject(details::RocksDBLogRecord{ now(), severity, id, threadID, { { "Text", std::string(buf) } } });
|
||||||
|
} else {
|
||||||
|
logger.inject(details::RocksDBLogRecord{
|
||||||
|
now(),
|
||||||
|
severity,
|
||||||
|
id,
|
||||||
|
threadID,
|
||||||
|
{ { "Text", std::string(buf) }, { "OriginalBacktrace", platform::get_backtrace() } } });
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
#include "fdbserver/Knobs.h"
|
#include "fdbserver/Knobs.h"
|
||||||
#include "fdbserver/TransactionTagCounter.h"
|
#include "fdbserver/TransactionTagCounter.h"
|
||||||
#include "flow/Trace.h"
|
#include "flow/Trace.h"
|
||||||
|
@ -90,9 +91,6 @@ class TransactionTagCounterImpl {
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||||
|
|
||||||
// Round up to the nearest page size
|
|
||||||
static int64_t costFunction(int64_t bytes) { return (bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
TransactionTagCounterImpl(UID thisServerID)
|
TransactionTagCounterImpl(UID thisServerID)
|
||||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
||||||
|
@ -101,7 +99,7 @@ public:
|
||||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||||
if (tags.present()) {
|
if (tags.present()) {
|
||||||
CODE_PROBE(true, "Tracking transaction tag in counter");
|
CODE_PROBE(true, "Tracking transaction tag in counter");
|
||||||
double cost = costFunction(bytes);
|
auto const cost = getReadOperationCost(bytes);
|
||||||
for (auto& tag : tags.get()) {
|
for (auto& tag : tags.get()) {
|
||||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
||||||
topTags.incrementCount(tag, count, cost);
|
topTags.incrementCount(tag, count, cost);
|
||||||
|
|
|
@ -144,6 +144,7 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
|
||||||
// Resolver's version
|
// Resolver's version
|
||||||
void applyMetadataMutations(SpanContext const& spanContext,
|
void applyMetadataMutations(SpanContext const& spanContext,
|
||||||
ResolverData& resolverData,
|
ResolverData& resolverData,
|
||||||
const VectorRef<MutationRef>& mutations);
|
const VectorRef<MutationRef>& mutations,
|
||||||
|
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
* RocksDBLogForwarder.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef __ROCKSDB_LOG_FORWARDER_H__
|
||||||
|
#define __ROCKSDB_LOG_FORWARDER_H__
|
||||||
|
|
||||||
|
#include <cstdarg>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#include <rocksdb/env.h>
|
||||||
|
|
||||||
|
#include "flow/genericactors.actor.h"
|
||||||
|
#include "flow/IRandom.h"
|
||||||
|
#include "flow/Trace.h"
|
||||||
|
|
||||||
|
namespace details {
|
||||||
|
|
||||||
|
// Stores a RocksDB log line, transformed into Key/Value pairs
|
||||||
|
struct RocksDBLogRecord {
|
||||||
|
double logReceiveTime;
|
||||||
|
Severity severity;
|
||||||
|
UID uid;
|
||||||
|
std::thread::id threadID;
|
||||||
|
std::vector<std::pair<std::string, std::string>> kvPairs;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Stores RocksDB log lines for furthur consumption.
|
||||||
|
// *NOTE* This logger *MUST* run in a thread that is able to generate TraceEvents, e.g. in the event loop thread.
|
||||||
|
class RocksDBLogger {
|
||||||
|
// The mutex that protects log records, as RocksDB is multi-threaded
|
||||||
|
std::mutex recordsMutex;
|
||||||
|
|
||||||
|
// Main thread ID. Only triggers TraceEvent when on main thread. In FDB only the main thread contains information
|
||||||
|
// that could thread.
|
||||||
|
const std::thread::id mainThreadId;
|
||||||
|
|
||||||
|
// The log record
|
||||||
|
std::vector<RocksDBLogRecord> logRecords;
|
||||||
|
|
||||||
|
// An ACTOR that logs the non-main thread data periodically
|
||||||
|
Future<Void> periodicLogger;
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Constructor
|
||||||
|
RocksDBLogger();
|
||||||
|
|
||||||
|
// *Moves* the record to internal records
|
||||||
|
void inject(RocksDBLogRecord&& record);
|
||||||
|
|
||||||
|
// Consumes all the records
|
||||||
|
void consume();
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace details
|
||||||
|
|
||||||
|
class NullRocksDBLogForwarder : public rocksdb::Logger {
|
||||||
|
public:
|
||||||
|
virtual void Logv(const char*, va_list) { /* intended to be blank */
|
||||||
|
}
|
||||||
|
virtual void Logv(const rocksdb::InfoLogLevel, const char*, va_list) { /* intended to be blank */
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class RocksDBLogForwarder : public rocksdb::Logger {
|
||||||
|
// The ID of the RocksDB instance
|
||||||
|
const UID id;
|
||||||
|
|
||||||
|
// The cache that stores the logs from RocksDB
|
||||||
|
details::RocksDBLogger logger;
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Constructor
|
||||||
|
// id is the UID of the logger
|
||||||
|
// log_level specifies the log level
|
||||||
|
explicit RocksDBLogForwarder(const UID& id,
|
||||||
|
const rocksdb::InfoLogLevel log_level = rocksdb::InfoLogLevel::INFO_LEVEL);
|
||||||
|
|
||||||
|
// Destructor
|
||||||
|
virtual ~RocksDBLogForwarder();
|
||||||
|
|
||||||
|
// Writes an entry to the log file
|
||||||
|
virtual void Logv(const char* format, va_list ap);
|
||||||
|
|
||||||
|
// Writes an entry to the log file, with a specificied log level
|
||||||
|
virtual void Logv(const rocksdb::InfoLogLevel log_level, const char* format, va_list ap);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif // __ROCKSDB_LOG_FORWARDER_H__
|
|
@ -2001,7 +2001,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
||||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||||
}
|
}
|
||||||
|
|
||||||
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
// Key size is not included in "BytesQueried", but still contributes to cost,
|
||||||
|
// so it must be accounted for here.
|
||||||
|
data->transactionTagCounter.addRequest(req.tags, req.key.size() + resultSize);
|
||||||
|
|
||||||
++data->counters.finishedQueries;
|
++data->counters.finishedQueries;
|
||||||
|
|
||||||
|
@ -3014,7 +3016,11 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
||||||
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
|
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
|
||||||
}
|
}
|
||||||
|
|
||||||
wait(delay(0, TaskPriority::DefaultEndpoint));
|
// Change feeds that are not atLatest must have a lower priority than UpdateStorage to not starve it out, and
|
||||||
|
// change feed disk reads generally only happen on blob worker recovery or data movement, so they should be
|
||||||
|
// lower priority. AtLatest change feeds are triggered directly from the SS update loop with no waits, so they
|
||||||
|
// will still be low latency
|
||||||
|
wait(delay(0, TaskPriority::SSSpilledChangeFeedReply));
|
||||||
|
|
||||||
if (DEBUG_CF_TRACE) {
|
if (DEBUG_CF_TRACE) {
|
||||||
TraceEvent(SevDebug, "TraceChangeFeedStreamStart", data->thisServerID)
|
TraceEvent(SevDebug, "TraceChangeFeedStreamStart", data->thisServerID)
|
||||||
|
@ -8567,6 +8573,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||||
} else {
|
} else {
|
||||||
MutationRef msg;
|
MutationRef msg;
|
||||||
cloneReader >> msg;
|
cloneReader >> msg;
|
||||||
|
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
|
||||||
|
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
|
||||||
|
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
if (msg.isEncrypted()) {
|
if (msg.isEncrypted()) {
|
||||||
if (!cipherKeys.present()) {
|
if (!cipherKeys.present()) {
|
||||||
const BlobCipherEncryptHeader* header = msg.encryptionHeader();
|
const BlobCipherEncryptHeader* header = msg.encryptionHeader();
|
||||||
|
@ -8720,6 +8731,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||||
} else {
|
} else {
|
||||||
MutationRef msg;
|
MutationRef msg;
|
||||||
rd >> msg;
|
rd >> msg;
|
||||||
|
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
|
||||||
|
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
|
||||||
|
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
if (msg.isEncrypted()) {
|
if (msg.isEncrypted()) {
|
||||||
ASSERT(cipherKeys.present());
|
ASSERT(cipherKeys.present());
|
||||||
msg = msg.decrypt(cipherKeys.get(), rd.arena(), BlobCipherMetrics::TLOG);
|
msg = msg.decrypt(cipherKeys.get(), rd.arena(), BlobCipherMetrics::TLOG);
|
||||||
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
/*
|
||||||
|
* TransactionCost.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
|
class TransactionCostWorkload : public TestWorkload {
|
||||||
|
int iterations{ 1000 };
|
||||||
|
Key prefix;
|
||||||
|
bool debugTransactions{ false };
|
||||||
|
|
||||||
|
Key getKey(uint64_t testNumber, uint64_t index = 0) const {
|
||||||
|
BinaryWriter bw(Unversioned());
|
||||||
|
bw << bigEndian64(testNumber);
|
||||||
|
bw << bigEndian64(index);
|
||||||
|
return bw.toValue().withPrefix(prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Value getValue(uint32_t size) { return makeString(size); }
|
||||||
|
|
||||||
|
static UID getDebugID(uint64_t testNumber) { return UID(testNumber << 32, testNumber << 32); }
|
||||||
|
|
||||||
|
class ITest {
|
||||||
|
protected:
|
||||||
|
uint64_t testNumber;
|
||||||
|
explicit ITest(uint64_t testNumber) : testNumber(testNumber) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
void debugTransaction(ReadYourWritesTransaction& tr) { tr.debugTransaction(getDebugID(testNumber)); }
|
||||||
|
virtual Future<Void> setup(TransactionCostWorkload const& workload, Database const&) { return Void(); }
|
||||||
|
virtual Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction>) = 0;
|
||||||
|
virtual int64_t expectedFinalCost() const = 0;
|
||||||
|
virtual ~ITest() = default;
|
||||||
|
};
|
||||||
|
|
||||||
|
class ReadEmptyTest : public ITest {
|
||||||
|
public:
|
||||||
|
explicit ReadEmptyTest(uint64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
return success(tr->get(workload.getKey(testNumber)));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
class ReadLargeValueTest : public ITest {
|
||||||
|
ACTOR static Future<Void> setup(TransactionCostWorkload const* workload,
|
||||||
|
ReadLargeValueTest* self,
|
||||||
|
Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.set(workload->getKey(self->testNumber), getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ReadLargeValueTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||||
|
return setup(&workload, this, cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
return success(tr->get(workload.getKey(testNumber)));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return 2 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
class WriteTest : public ITest {
|
||||||
|
public:
|
||||||
|
explicit WriteTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
tr->set(workload.getKey(testNumber), getValue(20));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override {
|
||||||
|
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class WriteLargeValueTest : public ITest {
|
||||||
|
public:
|
||||||
|
explicit WriteLargeValueTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
tr->set(workload.getKey(testNumber), getValue(CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override {
|
||||||
|
return 2 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class WriteMultipleValuesTest : public ITest {
|
||||||
|
public:
|
||||||
|
explicit WriteMultipleValuesTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
tr->set(workload.getKey(testNumber, i), getValue(20));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override {
|
||||||
|
return 10 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class ClearTest : public ITest {
|
||||||
|
public:
|
||||||
|
explicit ClearTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
tr->clear(singleKeyRange(workload.getKey(testNumber)));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
class ReadRangeTest : public ITest {
|
||||||
|
ACTOR static Future<Void> setup(ReadRangeTest* self, TransactionCostWorkload const* workload, Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
tr.set(workload->getKey(self->testNumber, i), workload->getValue(20));
|
||||||
|
}
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ReadRangeTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||||
|
return setup(this, &workload, cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
KeyRange const keys = KeyRangeRef(workload.getKey(testNumber, 0), workload.getKey(testNumber, 10));
|
||||||
|
return success(tr->getRange(keys, 10));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
class ReadMultipleValuesTest : public ITest {
|
||||||
|
ACTOR static Future<Void> setup(ReadMultipleValuesTest* self,
|
||||||
|
TransactionCostWorkload const* workload,
|
||||||
|
Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
tr.set(workload->getKey(self->testNumber, i), workload->getValue(20));
|
||||||
|
}
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ReadMultipleValuesTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||||
|
return setup(this, &workload, cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
std::vector<Future<Void>> futures;
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
futures.push_back(success(tr->get(workload.getKey(testNumber, i))));
|
||||||
|
}
|
||||||
|
return waitForAll(futures);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return 10 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
class LargeReadRangeTest : public ITest {
|
||||||
|
ACTOR static Future<Void> setup(LargeReadRangeTest* self,
|
||||||
|
TransactionCostWorkload const* workload,
|
||||||
|
Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
tr.set(workload->getKey(self->testNumber, i),
|
||||||
|
workload->getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
|
||||||
|
}
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit LargeReadRangeTest(int64_t testNumber) : ITest(testNumber) {}
|
||||||
|
|
||||||
|
Future<Void> setup(TransactionCostWorkload const& workload, Database const& cx) override {
|
||||||
|
return setup(this, &workload, cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
|
||||||
|
KeyRange const keys = KeyRangeRef(workload.getKey(testNumber, 0), workload.getKey(testNumber, 10));
|
||||||
|
return success(tr->getRange(keys, 10));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t expectedFinalCost() const override { return 11 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
|
||||||
|
};
|
||||||
|
|
||||||
|
static std::unique_ptr<ITest> createRandomTest(int64_t testNumber) {
|
||||||
|
auto const rand = deterministicRandom()->randomInt(0, 9);
|
||||||
|
if (rand == 0) {
|
||||||
|
return std::make_unique<ReadEmptyTest>(testNumber);
|
||||||
|
} else if (rand == 1) {
|
||||||
|
return std::make_unique<ReadLargeValueTest>(testNumber);
|
||||||
|
} else if (rand == 2) {
|
||||||
|
return std::make_unique<ReadMultipleValuesTest>(testNumber);
|
||||||
|
} else if (rand == 3) {
|
||||||
|
return std::make_unique<WriteTest>(testNumber);
|
||||||
|
} else if (rand == 4) {
|
||||||
|
return std::make_unique<WriteLargeValueTest>(testNumber);
|
||||||
|
} else if (rand == 5) {
|
||||||
|
return std::make_unique<WriteMultipleValuesTest>(testNumber);
|
||||||
|
} else if (rand == 6) {
|
||||||
|
return std::make_unique<ClearTest>(testNumber);
|
||||||
|
} else if (rand == 7) {
|
||||||
|
return std::make_unique<ReadRangeTest>(testNumber);
|
||||||
|
} else {
|
||||||
|
return std::make_unique<LargeReadRangeTest>(testNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> runTest(TransactionCostWorkload* self, Database cx, ITest* test) {
|
||||||
|
wait(test->setup(*self, cx));
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||||
|
if (self->debugTransactions) {
|
||||||
|
test->debugTransaction(*tr);
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
wait(test->exec(*self, tr));
|
||||||
|
wait(tr->commit());
|
||||||
|
ASSERT_EQ(tr->getTotalCost(), test->expectedFinalCost());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> start(TransactionCostWorkload* self, Database cx) {
|
||||||
|
state uint64_t testNumber = 0;
|
||||||
|
state Future<Void> f;
|
||||||
|
// Must use shared_ptr because Flow doesn't support perfect forwarding into actors
|
||||||
|
state std::shared_ptr<ITest> test;
|
||||||
|
for (; testNumber < self->iterations; ++testNumber) {
|
||||||
|
test = createRandomTest(testNumber);
|
||||||
|
wait(runTest(self, cx, test.get()));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
TransactionCostWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||||
|
iterations = getOption(options, "iterations"_sr, 1000);
|
||||||
|
prefix = getOption(options, "prefix"_sr, "transactionCost/"_sr);
|
||||||
|
debugTransactions = getOption(options, "debug"_sr, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static constexpr auto NAME = "TransactionCost";
|
||||||
|
|
||||||
|
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||||
|
|
||||||
|
Future<Void> start(Database const& cx) override { return clientId ? Void() : start(this, cx); }
|
||||||
|
|
||||||
|
Future<bool> check(Database const& cx) override { return true; }
|
||||||
|
|
||||||
|
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkloadFactory<TransactionCostWorkload> TransactionCostWorkloadFactory;
|
|
@ -71,6 +71,7 @@ public: // introduced features
|
||||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
|
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
|
||||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
|
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
|
||||||
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
|
API_VERSION_FEATURE(@FDB_AV_TENANT_BLOB_RANGE_API@, TenantBlobRangeApi);
|
||||||
|
API_VERSION_FEATURE(@FDB_AV_GET_TOTAL_COST@, GetTotalCost);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // FLOW_CODE_API_VERSION_H
|
#endif // FLOW_CODE_API_VERSION_H
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# API Versions
|
# API Versions
|
||||||
set(FDB_AV_LATEST_VERSION "720")
|
set(FDB_AV_LATEST_VERSION "730")
|
||||||
|
|
||||||
# Features
|
# Features
|
||||||
set(FDB_AV_SNAPSHOT_RYW "300")
|
set(FDB_AV_SNAPSHOT_RYW "300")
|
||||||
|
@ -11,4 +11,5 @@ set(FDB_AV_BLOB_RANGE_API "720")
|
||||||
set(FDB_AV_CREATE_DB_FROM_CONN_STRING "720")
|
set(FDB_AV_CREATE_DB_FROM_CONN_STRING "720")
|
||||||
set(FDB_AV_FUTURE_GET_BOOL "720")
|
set(FDB_AV_FUTURE_GET_BOOL "720")
|
||||||
set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
|
set(FDB_AV_FUTURE_PROTOCOL_VERSION_API "720")
|
||||||
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
|
set(FDB_AV_TENANT_BLOB_RANGE_API "720")
|
||||||
|
set(FDB_AV_GET_TOTAL_COST "730")
|
||||||
|
|
|
@ -101,6 +101,7 @@ enum class TaskPriority {
|
||||||
UpdateStorage = 3000,
|
UpdateStorage = 3000,
|
||||||
CompactCache = 2900,
|
CompactCache = 2900,
|
||||||
TLogSpilledPeekReply = 2800,
|
TLogSpilledPeekReply = 2800,
|
||||||
|
SSSpilledChangeFeedReply = 2730,
|
||||||
BlobWorkerReadChangeFeed = 2720,
|
BlobWorkerReadChangeFeed = 2720,
|
||||||
BlobWorkerUpdateFDB = 2710,
|
BlobWorkerUpdateFDB = 2710,
|
||||||
BlobWorkerUpdateStorage = 2700,
|
BlobWorkerUpdateStorage = 2700,
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#include "benchmark/benchmark.h"
|
#include "benchmark/benchmark.h"
|
||||||
|
|
||||||
|
#include "flow/IRandom.h"
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "flow/DeterministicRandom.h"
|
#include "flow/DeterministicRandom.h"
|
||||||
#include "flow/network.h"
|
#include "flow/network.h"
|
||||||
|
@ -61,3 +62,32 @@ static void bench_net2(benchmark::State& benchState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
BENCHMARK(bench_net2)->Range(1, 1 << 16)->ReportAggregatesOnly(true);
|
BENCHMARK(bench_net2)->Range(1, 1 << 16)->ReportAggregatesOnly(true);
|
||||||
|
|
||||||
|
static constexpr bool DELAY = false;
|
||||||
|
static constexpr bool YIELD = true;
|
||||||
|
|
||||||
|
ACTOR template <bool useYield>
|
||||||
|
static Future<Void> benchDelay(benchmark::State* benchState) {
|
||||||
|
// Number of random delays to start to just to populate the run loop
|
||||||
|
// priority queue
|
||||||
|
state int64_t timerCount = benchState->range(0);
|
||||||
|
state std::vector<Future<Void>> futures;
|
||||||
|
state DeterministicRandom rand(platform::getRandomSeed());
|
||||||
|
while (--timerCount > 0) {
|
||||||
|
futures.push_back(delay(1.0 + rand.random01(), getRandomTaskPriority(rand)));
|
||||||
|
}
|
||||||
|
|
||||||
|
while (benchState->KeepRunning()) {
|
||||||
|
wait(useYield ? yield() : delay(0));
|
||||||
|
}
|
||||||
|
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool useYield>
|
||||||
|
static void bench_delay(benchmark::State& benchState) {
|
||||||
|
onMainThread([&benchState] { return benchDelay<useYield>(&benchState); }).blockUntilReady();
|
||||||
|
}
|
||||||
|
|
||||||
|
BENCHMARK_TEMPLATE(bench_delay, DELAY)->Range(0, 1 << 16)->ReportAggregatesOnly(true);
|
||||||
|
BENCHMARK_TEMPLATE(bench_delay, YIELD)->Range(0, 1 << 16)->ReportAggregatesOnly(true);
|
||||||
|
|
|
@ -240,6 +240,7 @@ if(WITH_PYTHON)
|
||||||
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
||||||
add_fdb_test(TEST_FILES rare/Throttling.toml)
|
add_fdb_test(TEST_FILES rare/Throttling.toml)
|
||||||
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml)
|
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml)
|
||||||
|
add_fdb_test(TEST_FILES rare/TransactionCost.toml)
|
||||||
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
|
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
|
||||||
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
|
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
|
||||||
add_fdb_test(TEST_FILES rare/WriteTagThrottling.toml)
|
add_fdb_test(TEST_FILES rare/WriteTagThrottling.toml)
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
[[test]]
|
||||||
|
testTitle = 'TransactionCostTest'
|
||||||
|
|
||||||
|
[[test.workload]]
|
||||||
|
testName = 'TransactionCost'
|
||||||
|
iterations = 1000
|
Loading…
Reference in New Issue