Merge branch 'main' of https://github.com/apple/foundationdb into fix/main/restoreStats

This commit is contained in:
Xiaoxi Wang 2022-11-09 16:30:41 -08:00
commit 93daa9ebb7
16 changed files with 154 additions and 107 deletions

View File

@ -653,12 +653,12 @@ const (
StreamingModeWantAll StreamingMode = -1
// The default. The client doesn't know how much of the range it is likely
// to used and wants different performance concerns to be balanced. Only a
// small portion of data is transferred to the client initially (in order to
// minimize costs if the client doesn't read the entire range), and as the
// caller iterates over more items in the range larger batches will be
// transferred in order to minimize latency. After enough iterations, the
// iterator mode will eventually reach the same byte limit as “WANT_ALL“
// to used and wants different performance concerns to be balanced.
// Only a small portion of data is transferred to the client initially (in
// order to minimize costs if the client doesn't read the entire range), and
// as the caller iterates over more items in the range larger batches will
// be transferred in order to minimize latency. After enough iterations,
// the iterator mode will eventually reach the same byte limit as “WANT_ALL“
StreamingModeIterator StreamingMode = 0
// Infrequently used. The client has passed a specific row limit and wants
@ -668,8 +668,8 @@ const (
// mode is used.
StreamingModeExact StreamingMode = 1
// Infrequently used. Transfer data in batches small enough to not be much
// more expensive than reading individual rows, to minimize cost if
// Infrequently used. Transfer data in batches small enough to not be
// much more expensive than reading individual rows, to minimize cost if
// iteration stops early.
StreamingModeSmall StreamingMode = 2
@ -677,16 +677,16 @@ const (
// large.
StreamingModeMedium StreamingMode = 3
// Infrequently used. Transfer data in batches large enough to be, in a
// high-concurrency environment, nearly as efficient as possible. If the
// client stops iteration early, some disk and network bandwidth may be
// wasted. The batch size may still be too small to allow a single client to
// get high throughput from the database, so if that is what you need
// Infrequently used. Transfer data in batches large enough to be,
// in a high-concurrency environment, nearly as efficient as possible.
// If the client stops iteration early, some disk and network bandwidth may
// be wasted. The batch size may still be too small to allow a single client
// to get high throughput from the database, so if that is what you need
// consider the SERIAL StreamingMode.
StreamingModeLarge StreamingMode = 4
// Transfer data in batches large enough that an individual client can get
// reasonable read bandwidth from the database. If the client stops
// Transfer data in batches large enough that an individual client can
// get reasonable read bandwidth from the database. If the client stops
// iteration early, considerable disk and network bandwidth may be wasted.
StreamingModeSerial StreamingMode = 5
)

View File

@ -14,13 +14,13 @@ The global tag throttler cannot throttle tags to a throughput below the reserved
Internally, the units for these quotas are bytes. The cost of an operation is rounded up to the nearest page size. The cost of a read operation is computed as:
```
readCost = ceiling(bytesRead / CLIENT_KNOBS->READ_COST_BYTE_FACTOR) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
readCost = ceiling(bytesRead / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
```
The cost of a write operation is computed as:
```
writeCost = CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
writeCost = CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * ceiling(bytesWritten / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
```
Here `bytesWritten` includes cleared bytes. The size of range clears is estimated at commit time.

View File

@ -91,11 +91,11 @@ ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
if (limitType == LimitType::TOTAL) {
// Round up to nearest page size
quota.totalQuota =
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
} else if (limitType == LimitType::RESERVED) {
// Round up to nearest page size
quota.reservedQuota =
((value - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
if (!quota.isValid()) {
throw invalid_throttle_quota_value();

View File

@ -270,8 +270,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 );
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
init( TAG_THROTTLING_PAGE_SIZE, 16384 ); if( randomize && BUGGIFY ) TAG_THROTTLING_PAGE_SIZE = 4096;
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
// busyness reporting

View File

@ -5837,7 +5837,7 @@ void Transaction::clear(const KeyRangeRef& range, AddConflictRange addConflictRa
// NOTE: The throttling cost of each clear is assumed to be one page.
// This makes compuation fast, but can be inaccurate and may
// underestimate the cost of large clears.
trState->totalCost += CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
trState->totalCost += CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
if (addConflictRange)
t.write_conflict_ranges.push_back(req.arena, r);
}

View File

@ -260,8 +260,7 @@ public:
double TAG_THROTTLE_SMOOTHING_WINDOW;
double TAG_THROTTLE_RECHECK_INTERVAL;
double TAG_THROTTLE_EXPIRATION_INTERVAL;
int64_t WRITE_COST_BYTE_FACTOR; // Used to round up the cost of write operations
int64_t READ_COST_BYTE_FACTOR; // Used to round up the cost of read operations
int64_t TAG_THROTTLING_PAGE_SIZE; // Used to round up the cost of operations
// Cost multiplier for writes (because write operations are more expensive than reads):
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;

View File

@ -574,13 +574,13 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion
// Measured in bytes, rounded up to the nearest page size. Multiply by fungibility ratio
// because writes are more expensive than reads.
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR *
((bytes - 1) / CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR + 1);
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE *
((bytes - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1);
}
// Measured in bytes, rounded up to the nearest page size.
inline uint64_t getReadOperationCost(uint64_t bytes) {
return ((bytes - 1) / CLIENT_KNOBS->READ_COST_BYTE_FACTOR + 1) * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
return ((bytes - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value

View File

@ -56,6 +56,8 @@ struct CheckpointMetaData {
// A serialized metadata associated with format, this data can be understood by the corresponding KVS.
Standalone<StringRef> serializedCheckpoint;
UID dataMoveId;
CheckpointMetaData() = default;
CheckpointMetaData(KeyRange const& range, CheckpointFormat format, UID const& ssID, UID const& checkpointID)
: version(invalidVersion), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending),
@ -67,6 +69,9 @@ struct CheckpointMetaData {
gcTime(0) {
this->ranges.push_back(range);
}
CheckpointMetaData(Version version, CheckpointFormat format, UID checkpointID)
: version(version), format(format), ssID(UID()), checkpointID(checkpointID), state(Pending), referenceCount(0),
gcTime(0) {}
CheckpointState getState() const { return static_cast<CheckpointState>(state); }
@ -77,16 +82,17 @@ struct CheckpointMetaData {
void setFormat(CheckpointFormat format) { this->format = static_cast<int16_t>(format); }
std::string toString() const {
std::string res = "Checkpoint MetaData:\nRange: " + describe(ranges) + "\nVersion: " + std::to_string(version) +
"\nFormat: " + std::to_string(format) + "\nServer: " + ssID.toString() +
"\nID: " + checkpointID.toString() + "\nState: " + std::to_string(static_cast<int>(state)) +
"\n";
std::string res = "Checkpoint MetaData: [Ranges]: " + describe(ranges) +
" [Version]: " + std::to_string(version) + " [Format]: " + std::to_string(format) +
" [Server]: " + ssID.toString() + " [ID]: " + checkpointID.toString() +
" [State]: " + std::to_string(static_cast<int>(state)) +
" [DataMove ID]: " + dataMoveId.toString();
return res;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, ranges, format, state, checkpointID, ssID, gcTime, serializedCheckpoint);
serializer(ar, version, ranges, format, state, checkpointID, ssID, gcTime, serializedCheckpoint, dataMoveId);
}
};

View File

@ -229,7 +229,7 @@ class GlobalTagThrottlerImpl {
if (transactionRate == 0.0) {
return {};
} else {
return std::max(static_cast<double>(CLIENT_KNOBS->READ_COST_BYTE_FACTOR), cost.get() / transactionRate);
return std::max(static_cast<double>(CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE), cost.get() / transactionRate);
}
}
@ -239,7 +239,7 @@ class GlobalTagThrottlerImpl {
auto const cost = getCurrentCost(tag);
auto const stats = tryGet(tagStatistics, tag);
if (!stats.present()) {
return CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
return CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
auto const transactionRate = stats.get().getTransactionRate();
// FIXME: Disabled due to noisy trace events. Fix the noise and reenabled
@ -250,9 +250,9 @@ class GlobalTagThrottlerImpl {
.detail("Cost", cost);
*/
if (transactionRate == 0.0) {
return CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
return CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
} else {
return std::max(static_cast<double>(CLIENT_KNOBS->READ_COST_BYTE_FACTOR), cost / transactionRate);
return std::max(static_cast<double>(CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE), cost / transactionRate);
}
}
@ -679,7 +679,7 @@ public:
result.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
}
result.lastReply.bytesInput = ((totalReadCost.smoothRate() + totalWriteCost.smoothRate()) /
(capacity * CLIENT_KNOBS->READ_COST_BYTE_FACTOR)) *
(capacity * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE)) *
SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER;
return result;
}
@ -702,7 +702,7 @@ public:
std::vector<int> const& storageServerIndices,
OpType opType) {
if (storageServerIndices.empty()) {
auto const costPerSS = CLIENT_KNOBS->READ_COST_BYTE_FACTOR * (pagesPerSecond / storageServers.size());
auto const costPerSS = CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE * (pagesPerSecond / storageServers.size());
for (auto& storageServer : storageServers) {
if (opType == OpType::READ) {
storageServer.addReadCost(tag, costPerSS);
@ -711,7 +711,8 @@ public:
}
}
} else {
auto const costPerSS = CLIENT_KNOBS->READ_COST_BYTE_FACTOR * (pagesPerSecond / storageServerIndices.size());
auto const costPerSS =
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE * (pagesPerSecond / storageServerIndices.size());
for (auto i : storageServerIndices) {
if (opType == OpType::READ) {
storageServers[i].addReadCost(tag, costPerSS);
@ -832,7 +833,7 @@ TEST_CASE("/GlobalTagThrottler/Simple") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
state Future<Void> monitor =
@ -851,7 +852,7 @@ TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::WRITE);
state Future<Void> monitor = monitorActor(&globalTagThrottler, [testTag](auto& gtt) {
@ -873,7 +874,7 @@ TEST_CASE("/GlobalTagThrottler/MultiTagThrottling") {
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag1, tagQuotaValue);
globalTagThrottler.setQuota(testTag2, tagQuotaValue);
state std::vector<Future<Void>> futures;
@ -897,7 +898,7 @@ TEST_CASE("/GlobalTagThrottler/AttemptWorkloadAboveQuota") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, OpType::READ);
state Future<Void> monitor =
@ -916,7 +917,7 @@ TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
state Future<Void> client2 = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
@ -938,7 +939,7 @@ TEST_CASE("/GlobalTagThrottler/MultiClientThrottling2") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, OpType::READ);
state Future<Void> client2 = runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, OpType::READ);
@ -961,7 +962,7 @@ TEST_CASE("/GlobalTagThrottler/SkewedMultiClientThrottling") {
state StorageServerCollection storageServers(10, 100);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 5.0, OpType::READ);
state Future<Void> client2 = runClient(&globalTagThrottler, &storageServers, testTag, 25.0, 5.0, OpType::READ);
@ -985,14 +986,14 @@ TEST_CASE("/GlobalTagThrottler/UpdateQuota") {
state StorageServerCollection storageServers(10, 100);
state ThrottleApi::TagQuotaValue tagQuotaValue;
state TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
state Future<Void> monitor = monitorActor(
&globalTagThrottler, [](auto& gtt) { return targetRateIsNear(gtt, "sampleTag1"_sr, 100.0 / 6.0); });
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 600.0));
tagQuotaValue.totalQuota = 50 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 50 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
monitor =
monitorActor(&globalTagThrottler, [](auto& gtt) { return targetRateIsNear(gtt, "sampleTag1"_sr, 50.0 / 6.0); });
@ -1011,7 +1012,7 @@ TEST_CASE("/GlobalTagThrottler/RemoveQuota") {
state StorageServerCollection storageServers(10, 100);
state ThrottleApi::TagQuotaValue tagQuotaValue;
state TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
state Future<Void> monitor = monitorActor(
@ -1033,7 +1034,7 @@ TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
state StorageServerCollection storageServers(10, 5);
state ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, OpType::READ);
state Future<Void> monitor = monitorActor(&globalTagThrottler, [testTag](auto& gtt) {
@ -1057,8 +1058,8 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling") {
state ThrottleApi::TagQuotaValue tagQuotaValue2;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue1.totalQuota = 50 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue1.totalQuota = 50 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag1, tagQuotaValue1);
globalTagThrottler.setQuota(testTag2, tagQuotaValue2);
std::vector<Future<Void>> futures;
@ -1086,8 +1087,8 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling2") {
state ThrottleApi::TagQuotaValue tagQuotaValue2;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue1.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue1.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag1, tagQuotaValue1);
globalTagThrottler.setQuota(testTag2, tagQuotaValue2);
std::vector<Future<Void>> futures;
@ -1116,8 +1117,8 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling3") {
state ThrottleApi::TagQuotaValue tagQuotaValue2;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue1.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue1.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
tagQuotaValue2.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag1, tagQuotaValue1);
globalTagThrottler.setQuota(testTag2, tagQuotaValue2);
std::vector<Future<Void>> futures;
@ -1143,8 +1144,8 @@ TEST_CASE("/GlobalTagThrottler/ReservedQuota") {
state StorageServerCollection storageServers(10, 5);
state ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.reservedQuota = 70 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
tagQuotaValue.reservedQuota = 70 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, OpType::READ);
state Future<Void> monitor =
@ -1206,7 +1207,7 @@ TEST_CASE("/GlobalTagThrottler/IgnoreWorstZone") {
state TransactionTag testTag = "sampleTag1"_sr;
storageServers.setCapacity(0, 1);
ThrottleApi::TagQuotaValue tagQuotaValue;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR;
tagQuotaValue.totalQuota = 100 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ);
state Future<Void> monitor = monitorActor(

View File

@ -2205,7 +2205,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
TraceEvent("RocksDBServeCheckpointBegin", id)
.detail("MinVersion", a.request.version)
.detail("Range", a.request.range.toString())
.detail("Ranges", describe(a.request.ranges))
.detail("Format", static_cast<int>(a.request.format))
.detail("CheckpointDir", a.request.checkpointDir);
@ -2236,7 +2236,8 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
.detail("PersistVersion", version);
// TODO: set the range as the actual shard range.
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
CheckpointMetaData res(version, a.request.format, a.request.checkpointID);
res.ranges = a.request.ranges;
const std::string& checkpointDir = abspath(a.request.checkpointDir);
if (a.request.format == RocksDBColumnFamily) {
@ -2519,7 +2520,7 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreColumnFamily")
state std::string checkpointDir = cwd + "checkpoint";
CheckpointRequest request(
latestVersion, allKeys, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
latestVersion, { allKeys }, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
std::vector<CheckpointMetaData> checkpoints;
@ -2559,7 +2560,8 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
platform::eraseDirectoryRecursive("checkpoint");
std::string checkpointDir = cwd + "checkpoint";
CheckpointRequest request(latestVersion, allKeys, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointRequest request(
latestVersion, { allKeys }, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
state ICheckpointReader* cpReader = newCheckpointReader(metaData, deterministicRandom()->randomUniqueID());

View File

@ -33,17 +33,17 @@
struct CheckpointRequest {
const Version version; // The FDB version at which the checkpoint is created.
const KeyRange range; // Keyrange this checkpoint must contain.
const std::vector<KeyRange> ranges; // Keyranges this checkpoint must contain.
const CheckpointFormat format;
const UID checkpointID;
const std::string checkpointDir; // The local directory where the checkpoint file will be created.
CheckpointRequest(const Version version,
const KeyRange& range,
const std::vector<KeyRange>& ranges,
const CheckpointFormat format,
const UID& id,
const std::string& checkpointDir)
: version(version), range(range), format(format), checkpointID(id), checkpointDir(checkpointDir) {}
: version(version), ranges(ranges), format(format), checkpointID(id), checkpointDir(checkpointDir) {}
};
class IKeyValueStore : public IClosable {

View File

@ -250,7 +250,7 @@ struct AddingShard : NonCopyable {
class ShardInfo : public ReferenceCounted<ShardInfo>, NonCopyable {
ShardInfo(KeyRange keys, std::unique_ptr<AddingShard>&& adding, StorageServer* readWrite)
: adding(std::move(adding)), readWrite(readWrite), keys(keys), version(0) {}
: adding(std::move(adding)), readWrite(readWrite), keys(keys), shardId(0LL), desiredShardId(0LL), version(0) {}
public:
// A shard has 3 mutual exclusive states: adding, readWrite and notAssigned.
@ -1955,8 +1955,18 @@ Future<Version> waitForVersion(StorageServer* data,
if (readVersion < data->oldestVersion.get() || readVersion <= 0) {
return transaction_too_old();
} else {
// It is correct to read any version between [commitVersion, readVersion],
// because version vector guarantees no mutations between them.
if (commitVersion < data->oldestVersion.get()) {
return data->oldestVersion.get();
if (data->version.get() < readVersion) {
// Majority of the case, try using higher version to avoid
// transaction_too_old error when oldestVersion advances.
// BTW, any version in the range [oldestVersion, data->version.get()] is valid in this case.
return data->version.get();
} else {
ASSERT(readVersion >= data->oldestVersion.get());
return readVersion;
}
} else if (commitVersion <= data->version.get()) {
return commitVersion;
}
@ -3606,7 +3616,11 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
ASSERT(atStorageVersion.size() <= limit);
if (data->storageVersion() > version) {
// TraceEvent(SevDebug, "ReadRangeThrow", data->thisServerID).detail("Version", version).detail("StorageVersion", data->storageVersion());
DisabledTraceEvent("SS_TTO", data->thisServerID)
.detail("StorageVersion", data->storageVersion())
.detail("Oldest", data->oldestVersion.get())
.detail("Version", version)
.detail("Range", range);
throw transaction_too_old();
}
@ -3702,8 +3716,14 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
data->readRangeBytesLimitHistogram->sample(*pLimitBytes);
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version)
if (data->storageVersion() > version) {
DisabledTraceEvent("SS_TTO", data->thisServerID)
.detail("StorageVersion", data->storageVersion())
.detail("Oldest", data->oldestVersion.get())
.detail("Version", version)
.detail("Range", range);
throw transaction_too_old();
}
int prevSize = result.data.size();
merge(result.arena,
@ -3934,6 +3954,14 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
DisabledTraceEvent("VVV", data->thisServerID)
.detail("Version", version)
.detail("ReqVersion", req.version)
.detail("Oldest", data->oldestVersion.get())
.detail("VV", req.ssLatestCommitVersions.toString())
.detail("DebugID",
req.options.present() && req.options.get().debugID.present() ? req.options.get().debugID.get()
: UID());
data->counters.readVersionWaitSample.addMeasurement(g_network->timer() - queueWaitEnd);
state Optional<TenantMapEntry> tenantEntry = data->getTenantEntry(version, req.tenantInfo);
@ -9107,7 +9135,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
ASSERT(metaData.ssID == data->thisServerID && !metaData.ranges.empty());
const CheckpointRequest req(metaData.version,
metaData.ranges.front(),
metaData.ranges,
static_cast<CheckpointFormat>(metaData.format),
metaData.checkpointID,
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());

View File

@ -33,8 +33,8 @@ struct BulkSetupWorkload : TestWorkload {
int nodeCount;
double transactionsPerSecond;
Key keyPrefix;
double maxNumTenantsPerClient;
double minNumTenantsPerClient;
double maxNumTenants;
double minNumTenants;
std::vector<TenantName> tenantNames;
BulkSetupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
@ -42,9 +42,9 @@ struct BulkSetupWorkload : TestWorkload {
nodeCount = getOption(options, "nodeCount"_sr, transactionsPerSecond * clientCount);
keyPrefix = unprintable(getOption(options, "keyPrefix"_sr, ""_sr).toString());
// maximum and minimum number of tenants per client
maxNumTenantsPerClient = getOption(options, "maxNumTenantsPerClient"_sr, 0);
minNumTenantsPerClient = getOption(options, "minNumTenantsPerClient"_sr, 0);
ASSERT(minNumTenantsPerClient <= maxNumTenantsPerClient);
maxNumTenants = getOption(options, "maxNumTenants"_sr, 0);
minNumTenants = getOption(options, "minNumTenants"_sr, 0);
ASSERT(minNumTenants <= maxNumTenants);
}
void getMetrics(std::vector<PerfMetric>& m) override {}
@ -58,14 +58,14 @@ struct BulkSetupWorkload : TestWorkload {
ACTOR static Future<Void> _setup(BulkSetupWorkload* workload, Database cx) {
// create a bunch of tenants (between min and max tenants)
state int numTenantsToCreate =
deterministicRandom()->randomInt(workload->minNumTenantsPerClient, workload->maxNumTenantsPerClient + 1);
deterministicRandom()->randomInt(workload->minNumTenants, workload->maxNumTenants + 1);
TraceEvent("BulkSetupTenantCreation").detail("NumTenants", numTenantsToCreate);
if (numTenantsToCreate > 0) {
std::vector<Future<Void>> tenantFutures;
for (int i = 0; i < numTenantsToCreate; i++) {
TenantMapEntry entry;
entry.encrypted = SERVER_KNOBS->ENABLE_ENCRYPTION;
workload->tenantNames.push_back(TenantName(format("BulkSetupTenant_%04d_%04d", workload->clientId, i)));
workload->tenantNames.push_back(TenantName(format("BulkSetupTenant_%04d", i)));
TraceEvent("CreatingTenant")
.detail("Tenant", workload->tenantNames.back())
.detail("TenantGroup", entry.tenantGroup);
@ -78,23 +78,31 @@ struct BulkSetupWorkload : TestWorkload {
}
Future<Void> start(Database const& cx) override {
return bulkSetup(cx,
this,
nodeCount,
Promise<double>(),
false,
0.0,
1e12,
std::vector<uint64_t>(),
Promise<std::vector<std::pair<uint64_t, double>>>(),
0,
0.1,
0,
0,
tenantNames);
if (clientId == 0) {
return bulkSetup(cx,
this,
nodeCount,
Promise<double>(),
false,
0.0,
1e12,
std::vector<uint64_t>(),
Promise<std::vector<std::pair<uint64_t, double>>>(),
0,
0.1,
0,
0,
tenantNames);
}
return Void();
}
Future<Void> setup(Database const& cx) override { return _setup(this, cx); }
Future<Void> setup(Database const& cx) override {
if (clientId == 0) {
return _setup(this, cx);
}
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
};

View File

@ -566,6 +566,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
req.options = ReadOptions(debugRandom()->randomUniqueID());
DisabledTraceEvent("CCD", req.options.get().debugID.get()).detail("Version", version);
// Try getting the entries in the specified range
state std::vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;

View File

@ -59,7 +59,7 @@ class TransactionCostWorkload : public TestWorkload {
return success(tr->get(workload.getKey(testNumber)));
}
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
class ReadLargeValueTest : public ITest {
@ -69,7 +69,7 @@ class TransactionCostWorkload : public TestWorkload {
state Transaction tr(cx);
loop {
try {
tr.set(workload->getKey(self->testNumber), getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
tr.set(workload->getKey(self->testNumber), getValue(CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE));
wait(tr.commit());
return Void();
} catch (Error& e) {
@ -89,7 +89,7 @@ class TransactionCostWorkload : public TestWorkload {
return success(tr->get(workload.getKey(testNumber)));
}
int64_t expectedFinalCost() const override { return 2 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return 2 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
class WriteTest : public ITest {
@ -102,7 +102,7 @@ class TransactionCostWorkload : public TestWorkload {
}
int64_t expectedFinalCost() const override {
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
return CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
};
@ -111,12 +111,13 @@ class TransactionCostWorkload : public TestWorkload {
explicit WriteLargeValueTest(int64_t testNumber) : ITest(testNumber) {}
Future<Void> exec(TransactionCostWorkload const& workload, Reference<ReadYourWritesTransaction> tr) override {
tr->set(workload.getKey(testNumber), getValue(CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR));
tr->set(workload.getKey(testNumber), getValue(CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE));
return Void();
}
int64_t expectedFinalCost() const override {
return 2 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
return 2 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO *
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
};
@ -132,7 +133,8 @@ class TransactionCostWorkload : public TestWorkload {
}
int64_t expectedFinalCost() const override {
return 10 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO * CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR;
return 10 * CLIENT_KNOBS->GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO *
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
}
};
@ -145,7 +147,7 @@ class TransactionCostWorkload : public TestWorkload {
return Void();
}
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
class ReadRangeTest : public ITest {
@ -176,7 +178,7 @@ class TransactionCostWorkload : public TestWorkload {
return success(tr->getRange(keys, 10));
}
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
class ReadMultipleValuesTest : public ITest {
@ -212,7 +214,7 @@ class TransactionCostWorkload : public TestWorkload {
return waitForAll(futures);
}
int64_t expectedFinalCost() const override { return 10 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return 10 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
class LargeReadRangeTest : public ITest {
@ -224,7 +226,7 @@ class TransactionCostWorkload : public TestWorkload {
try {
for (int i = 0; i < 10; ++i) {
tr.set(workload->getKey(self->testNumber, i),
workload->getValue(CLIENT_KNOBS->READ_COST_BYTE_FACTOR));
workload->getValue(CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE));
}
wait(tr.commit());
return Void();
@ -246,7 +248,7 @@ class TransactionCostWorkload : public TestWorkload {
return success(tr->getRange(keys, 10));
}
int64_t expectedFinalCost() const override { return 11 * CLIENT_KNOBS->READ_COST_BYTE_FACTOR; }
int64_t expectedFinalCost() const override { return 11 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; }
};
static std::unique_ptr<ITest> createRandomTest(int64_t testNumber) {

View File

@ -14,8 +14,8 @@ simBackupAgents = 'BackupToFile'
[[test.workload]]
testName = 'BulkLoadWithTenants'
maxNumTenantsPerClient = 100
minNumTenantsPerClient = 0
maxNumTenants = 100
minNumTenants = 0
transactionsPerSecond = 2500.0
[[test.workload]]