Merge remote-tracking branch 'origin/main' into main-fix-op-cost-bug
This commit is contained in:
commit
3c6941192e
|
@ -393,7 +393,7 @@ class TestRun:
|
||||||
def delete_simdir(self):
|
def delete_simdir(self):
|
||||||
shutil.rmtree(self.temp_path / Path("simfdb"))
|
shutil.rmtree(self.temp_path / Path("simfdb"))
|
||||||
|
|
||||||
def _run_rocksdb_logtool(self):
|
def _run_joshua_logtool(self):
|
||||||
"""Calls Joshua LogTool to upload the test logs if 1) test failed 2) test is RocksDB related"""
|
"""Calls Joshua LogTool to upload the test logs if 1) test failed 2) test is RocksDB related"""
|
||||||
if not os.path.exists("joshua_logtool.py"):
|
if not os.path.exists("joshua_logtool.py"):
|
||||||
raise RuntimeError("joshua_logtool.py missing")
|
raise RuntimeError("joshua_logtool.py missing")
|
||||||
|
@ -407,7 +407,12 @@ class TestRun:
|
||||||
str(self.temp_path),
|
str(self.temp_path),
|
||||||
"--check-rocksdb",
|
"--check-rocksdb",
|
||||||
]
|
]
|
||||||
subprocess.run(command, check=True)
|
result = subprocess.run(command, capture_output=True, text=True)
|
||||||
|
return {
|
||||||
|
"stdout": result.stdout,
|
||||||
|
"stderr": result.stderr,
|
||||||
|
"exit_code": result.returncode,
|
||||||
|
}
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
command: List[str] = []
|
command: List[str] = []
|
||||||
|
@ -498,10 +503,16 @@ class TestRun:
|
||||||
self.summary.was_killed = did_kill
|
self.summary.was_killed = did_kill
|
||||||
self.summary.valgrind_out_file = valgrind_file
|
self.summary.valgrind_out_file = valgrind_file
|
||||||
self.summary.error_out = err_out
|
self.summary.error_out = err_out
|
||||||
|
if not self.summary.is_negative_test and not self.summary.ok():
|
||||||
|
logtool_result = self._run_joshua_logtool()
|
||||||
|
if logtool_result["exit_code"] != 0:
|
||||||
|
child = SummaryTree("JoshuaLogTool")
|
||||||
|
child.attributes["ExitCode"] = str(logtool_result["exit_code"])
|
||||||
|
child.attributes["StdOut"] = logtool_result["stdout"]
|
||||||
|
child.attributes["StdErr"] = logtool_result["stderr"]
|
||||||
|
self.summary.out.append(child)
|
||||||
self.summary.summarize(self.temp_path, " ".join(command))
|
self.summary.summarize(self.temp_path, " ".join(command))
|
||||||
|
|
||||||
if not self.summary.is_negative_test and not self.summary.ok():
|
|
||||||
self._run_rocksdb_logtool()
|
|
||||||
return self.summary.ok()
|
return self.summary.ok()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#! /usr/bin/env python3
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
"""rocksdb_logtool.py
|
"""joshua_logtool.py
|
||||||
|
|
||||||
Provides uploading/downloading FoundationDB log files to Joshua cluster.
|
Provides uploading/downloading FoundationDB log files to Joshua cluster.
|
||||||
"""
|
"""
|
||||||
|
@ -129,7 +129,7 @@ def list_commands(ensemble_id: str):
|
||||||
|
|
||||||
|
|
||||||
def _setup_args():
|
def _setup_args():
|
||||||
parser = argparse.ArgumentParser(prog="rocksdb_logtool.py")
|
parser = argparse.ArgumentParser(prog="joshua_logtool.py")
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--cluster-file", type=str, default=None, help="Joshua FDB cluster file"
|
"--cluster-file", type=str, default=None, help="Joshua FDB cluster file"
|
||||||
|
|
|
@ -3800,12 +3800,13 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// First and last key are the range for this file
|
// First and last key are the range for this file
|
||||||
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
|
state KeyRange fileRange;
|
||||||
state std::vector<KeyRange> originalFileRanges;
|
state std::vector<KeyRange> originalFileRanges;
|
||||||
// If fileRange doesn't intersect restore range then we're done.
|
// If fileRange doesn't intersect restore range then we're done.
|
||||||
state int index;
|
state int index;
|
||||||
for (index = 0; index < restoreRanges.get().size(); index++) {
|
for (index = 0; index < restoreRanges.get().size(); index++) {
|
||||||
auto& restoreRange = restoreRanges.get()[index];
|
auto& restoreRange = restoreRanges.get()[index];
|
||||||
|
fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
|
||||||
if (!fileRange.intersects(restoreRange))
|
if (!fileRange.intersects(restoreRange))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
|
|
@ -829,7 +829,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5;
|
init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5;
|
||||||
init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0;
|
init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0;
|
||||||
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
|
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
|
||||||
init( SS_THROTTLE_TAGS_TRACKED, 1 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10);
|
init( SS_THROTTLE_TAGS_TRACKED, 5 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10);
|
||||||
init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip();
|
init( GLOBAL_TAG_THROTTLING, true ); if(isSimulated) GLOBAL_TAG_THROTTLING = deterministicRandom()->coinflip();
|
||||||
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
init( ENFORCE_TAG_THROTTLING_ON_PROXIES, GLOBAL_TAG_THROTTLING );
|
||||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||||
|
|
|
@ -37,6 +37,8 @@
|
||||||
|
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
|
#define DEBUG_GET_CIPHER false
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
Optional<EncryptKeyProxyInterface> _getEncryptKeyProxyInterface(const Reference<AsyncVar<T> const>& db) {
|
Optional<EncryptKeyProxyInterface> _getEncryptKeyProxyInterface(const Reference<AsyncVar<T> const>& db) {
|
||||||
if constexpr (std::is_same_v<T, ClientDBInfo>) {
|
if constexpr (std::is_same_v<T, ClientDBInfo>) {
|
||||||
|
@ -62,9 +64,13 @@ Future<Void> _onEncryptKeyProxyChange(Reference<AsyncVar<T> const> db) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TraceEvent("GetEncryptCipherKeysEncryptKeyProxyChanged")
|
|
||||||
.detail("PreviousProxyId", previousProxyId.orDefault(UID()))
|
if (DEBUG_GET_CIPHER) {
|
||||||
.detail("CurrentProxyId", currentProxyId.orDefault(UID()));
|
TraceEvent(SevDebug, "GetEncryptCipherKeysEncryptKeyProxyChanged")
|
||||||
|
.detail("PreviousProxyId", previousProxyId.orDefault(UID()))
|
||||||
|
.detail("CurrentProxyId", currentProxyId.orDefault(UID()));
|
||||||
|
}
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +81,10 @@ Future<EKPGetLatestBaseCipherKeysReply> _getUncachedLatestEncryptCipherKeys(Refe
|
||||||
Optional<EncryptKeyProxyInterface> proxy = _getEncryptKeyProxyInterface(db);
|
Optional<EncryptKeyProxyInterface> proxy = _getEncryptKeyProxyInterface(db);
|
||||||
if (!proxy.present()) {
|
if (!proxy.present()) {
|
||||||
// Wait for onEncryptKeyProxyChange.
|
// Wait for onEncryptKeyProxyChange.
|
||||||
TraceEvent("GetLatestEncryptCipherKeysEncryptKeyProxyNotPresent").detail("UsageType", toString(usageType));
|
if (DEBUG_GET_CIPHER) {
|
||||||
|
TraceEvent(SevDebug, "GetLatestEncryptCipherKeysEncryptKeyProxyNotPresent")
|
||||||
|
.detail("UsageType", toString(usageType));
|
||||||
|
}
|
||||||
return Never();
|
return Never();
|
||||||
}
|
}
|
||||||
request.reply.reset();
|
request.reply.reset();
|
||||||
|
@ -178,7 +187,10 @@ Future<EKPGetBaseCipherKeysByIdsReply> _getUncachedEncryptCipherKeys(Reference<A
|
||||||
Optional<EncryptKeyProxyInterface> proxy = _getEncryptKeyProxyInterface(db);
|
Optional<EncryptKeyProxyInterface> proxy = _getEncryptKeyProxyInterface(db);
|
||||||
if (!proxy.present()) {
|
if (!proxy.present()) {
|
||||||
// Wait for onEncryptKeyProxyChange.
|
// Wait for onEncryptKeyProxyChange.
|
||||||
TraceEvent("GetEncryptCipherKeysEncryptKeyProxyNotPresent").detail("UsageType", toString(usageType));
|
if (DEBUG_GET_CIPHER) {
|
||||||
|
TraceEvent(SevDebug, "GetEncryptCipherKeysEncryptKeyProxyNotPresent")
|
||||||
|
.detail("UsageType", toString(usageType));
|
||||||
|
}
|
||||||
return Never();
|
return Never();
|
||||||
}
|
}
|
||||||
request.reply.reset();
|
request.reply.reset();
|
||||||
|
|
|
@ -1168,23 +1168,27 @@ struct GetStorageMetricsRequest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Tracks the busyness of tags on individual storage servers.
|
||||||
|
struct BusyTagInfo {
|
||||||
|
constexpr static FileIdentifier file_identifier = 4528694;
|
||||||
|
TransactionTag tag;
|
||||||
|
double rate{ 0.0 };
|
||||||
|
double fractionalBusyness{ 0.0 };
|
||||||
|
|
||||||
|
BusyTagInfo() = default;
|
||||||
|
BusyTagInfo(TransactionTag const& tag, double rate, double fractionalBusyness)
|
||||||
|
: tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {}
|
||||||
|
|
||||||
|
bool operator<(BusyTagInfo const& rhs) const { return rate < rhs.rate; }
|
||||||
|
bool operator>(BusyTagInfo const& rhs) const { return rate > rhs.rate; }
|
||||||
|
|
||||||
|
template <class Ar>
|
||||||
|
void serialize(Ar& ar) {
|
||||||
|
serializer(ar, tag, rate, fractionalBusyness);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
struct StorageQueuingMetricsReply {
|
struct StorageQueuingMetricsReply {
|
||||||
struct TagInfo {
|
|
||||||
constexpr static FileIdentifier file_identifier = 4528694;
|
|
||||||
TransactionTag tag;
|
|
||||||
double rate{ 0.0 };
|
|
||||||
double fractionalBusyness{ 0.0 };
|
|
||||||
|
|
||||||
TagInfo() = default;
|
|
||||||
TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness)
|
|
||||||
: tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {}
|
|
||||||
|
|
||||||
template <class Ar>
|
|
||||||
void serialize(Ar& ar) {
|
|
||||||
serializer(ar, tag, rate, fractionalBusyness);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
constexpr static FileIdentifier file_identifier = 7633366;
|
constexpr static FileIdentifier file_identifier = 7633366;
|
||||||
double localTime;
|
double localTime;
|
||||||
int64_t instanceID; // changes if bytesDurable and bytesInput reset
|
int64_t instanceID; // changes if bytesDurable and bytesInput reset
|
||||||
|
@ -1195,7 +1199,7 @@ struct StorageQueuingMetricsReply {
|
||||||
double cpuUsage{ 0.0 };
|
double cpuUsage{ 0.0 };
|
||||||
double diskUsage{ 0.0 };
|
double diskUsage{ 0.0 };
|
||||||
double localRateLimit;
|
double localRateLimit;
|
||||||
std::vector<TagInfo> busiestTags;
|
std::vector<BusyTagInfo> busiestTags;
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
|
|
|
@ -466,7 +466,7 @@ public:
|
||||||
// Currently there is no differentiation between batch priority and default priority transactions
|
// Currently there is no differentiation between batch priority and default priority transactions
|
||||||
TraceEvent te("GlobalTagThrottler_GotRate", id);
|
TraceEvent te("GlobalTagThrottler_GotRate", id);
|
||||||
bool const traceEnabled = stats.canLog();
|
bool const traceEnabled = stats.canLog();
|
||||||
if (traceEnabled) {
|
if (!traceEnabled) {
|
||||||
te.disable();
|
te.disable();
|
||||||
}
|
}
|
||||||
bool isBusy{ false };
|
bool isBusy{ false };
|
||||||
|
@ -497,7 +497,8 @@ public:
|
||||||
// Currently there is no differentiation between batch priority and default priority transactions
|
// Currently there is no differentiation between batch priority and default priority transactions
|
||||||
bool isBusy{ false };
|
bool isBusy{ false };
|
||||||
TraceEvent te("GlobalTagThrottler_GotClientRate", id);
|
TraceEvent te("GlobalTagThrottler_GotClientRate", id);
|
||||||
if (!stats.canLog()) {
|
bool const traceEnabled = stats.canLog();
|
||||||
|
if (!traceEnabled) {
|
||||||
te.disable();
|
te.disable();
|
||||||
}
|
}
|
||||||
auto const targetTps = getTargetTps(tag, isBusy, te);
|
auto const targetTps = getTargetTps(tag, isBusy, te);
|
||||||
|
@ -510,7 +511,9 @@ public:
|
||||||
auto const clientRate = stats.updateAndGetPerClientLimit(targetTps.get());
|
auto const clientRate = stats.updateAndGetPerClientLimit(targetTps.get());
|
||||||
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = clientRate;
|
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = clientRate;
|
||||||
te.detail("ClientTps", clientRate.tpsRate);
|
te.detail("ClientTps", clientRate.tpsRate);
|
||||||
stats.updateLastLogged();
|
if (traceEnabled) {
|
||||||
|
stats.updateLastLogged();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
te.disable();
|
te.disable();
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,7 +298,7 @@ const char* ShardOpToString(ShardOp op) {
|
||||||
}
|
}
|
||||||
void logShardEvent(StringRef name, ShardOp op, Severity severity = SevInfo, const std::string& message = "") {
|
void logShardEvent(StringRef name, ShardOp op, Severity severity = SevInfo, const std::string& message = "") {
|
||||||
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
||||||
e.detail("Name", name).detail("Action", ShardOpToString(op));
|
e.detail("ShardId", name).detail("Action", ShardOpToString(op));
|
||||||
if (!message.empty()) {
|
if (!message.empty()) {
|
||||||
e.detail("Message", message);
|
e.detail("Message", message);
|
||||||
}
|
}
|
||||||
|
@ -309,7 +309,10 @@ void logShardEvent(StringRef name,
|
||||||
Severity severity = SevInfo,
|
Severity severity = SevInfo,
|
||||||
const std::string& message = "") {
|
const std::string& message = "") {
|
||||||
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
||||||
e.detail("Name", name).detail("Action", ShardOpToString(op)).detail("Begin", range.begin).detail("End", range.end);
|
e.detail("ShardId", name)
|
||||||
|
.detail("Action", ShardOpToString(op))
|
||||||
|
.detail("Begin", range.begin)
|
||||||
|
.detail("End", range.end);
|
||||||
if (message != "") {
|
if (message != "") {
|
||||||
e.detail("Message", message);
|
e.detail("Message", message);
|
||||||
}
|
}
|
||||||
|
@ -652,6 +655,7 @@ struct PhysicalShard {
|
||||||
logRocksDBError(status, "AddCF");
|
logRocksDBError(status, "AddCF");
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
logShardEvent(id, ShardOp::OPEN);
|
||||||
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
|
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
|
||||||
this->isInitialized.store(true);
|
this->isInitialized.store(true);
|
||||||
return status;
|
return status;
|
||||||
|
@ -665,7 +669,7 @@ struct PhysicalShard {
|
||||||
rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint);
|
rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint);
|
||||||
if (metaData.files.empty()) {
|
if (metaData.files.empty()) {
|
||||||
TraceEvent(SevInfo, "RocksDBRestoreEmptyShard")
|
TraceEvent(SevInfo, "RocksDBRestoreEmptyShard")
|
||||||
.detail("Shard", id)
|
.detail("ShardId", id)
|
||||||
.detail("CheckpointID", checkpoint.checkpointID);
|
.detail("CheckpointID", checkpoint.checkpointID);
|
||||||
status = db->CreateColumnFamily(getCFOptions(), id, &cf);
|
status = db->CreateColumnFamily(getCFOptions(), id, &cf);
|
||||||
} else {
|
} else {
|
||||||
|
@ -697,12 +701,12 @@ struct PhysicalShard {
|
||||||
status = db->IngestExternalFile(cf, sstFiles, ingestOptions);
|
status = db->IngestExternalFile(cf, sstFiles, ingestOptions);
|
||||||
} else {
|
} else {
|
||||||
TraceEvent(SevWarn, "RocksDBServeRestoreEmptyRange")
|
TraceEvent(SevWarn, "RocksDBServeRestoreEmptyRange")
|
||||||
.detail("Shard", id)
|
.detail("ShardId", id)
|
||||||
.detail("RocksKeyValuesCheckpoint", rcp.toString())
|
.detail("RocksKeyValuesCheckpoint", rcp.toString())
|
||||||
.detail("Checkpoint", checkpoint.toString());
|
.detail("Checkpoint", checkpoint.toString());
|
||||||
}
|
}
|
||||||
TraceEvent(SevInfo, "PhysicalShardRestoredFiles")
|
TraceEvent(SevInfo, "PhysicalShardRestoredFiles")
|
||||||
.detail("Shard", id)
|
.detail("ShardId", id)
|
||||||
.detail("CFName", cf->GetName())
|
.detail("CFName", cf->GetName())
|
||||||
.detail("Checkpoint", checkpoint.toString())
|
.detail("Checkpoint", checkpoint.toString())
|
||||||
.detail("RestoredFiles", describe(sstFiles));
|
.detail("RestoredFiles", describe(sstFiles));
|
||||||
|
@ -765,11 +769,7 @@ struct PhysicalShard {
|
||||||
readIterPool.reset();
|
readIterPool.reset();
|
||||||
|
|
||||||
// Deleting default column family is not allowed.
|
// Deleting default column family is not allowed.
|
||||||
if (id == DEFAULT_CF_NAME) {
|
if (deletePending && id != DEFAULT_CF_NAME) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (deletePending) {
|
|
||||||
auto s = db->DropColumnFamily(cf);
|
auto s = db->DropColumnFamily(cf);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
logRocksDBError(s, "DestroyShard");
|
logRocksDBError(s, "DestroyShard");
|
||||||
|
@ -916,7 +916,7 @@ public:
|
||||||
rocksdb::ColumnFamilyMetaData cfMetadata;
|
rocksdb::ColumnFamilyMetaData cfMetadata;
|
||||||
shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata);
|
shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata);
|
||||||
TraceEvent e(SevInfo, "PhysicalShardLevelStats");
|
TraceEvent e(SevInfo, "PhysicalShardLevelStats");
|
||||||
e.detail("PhysicalShardID", id);
|
e.detail("ShardId", id);
|
||||||
std::string levelProp;
|
std::string levelProp;
|
||||||
for (auto it = cfMetadata.levels.begin(); it != cfMetadata.levels.end(); ++it) {
|
for (auto it = cfMetadata.levels.begin(); it != cfMetadata.levels.end(); ++it) {
|
||||||
std::string propValue = "";
|
std::string propValue = "";
|
||||||
|
@ -939,7 +939,7 @@ public:
|
||||||
rocksdb::Status init() {
|
rocksdb::Status init() {
|
||||||
const double start = now();
|
const double start = now();
|
||||||
// Open instance.
|
// Open instance.
|
||||||
TraceEvent(SevInfo, "ShardedRocksShardManagerInitBegin", this->logId).detail("DataPath", path);
|
TraceEvent(SevInfo, "ShardedRocksDBInitBegin", this->logId).detail("DataPath", path);
|
||||||
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
||||||
// Set rate limiter to a higher rate to avoid blocking storage engine initialization.
|
// Set rate limiter to a higher rate to avoid blocking storage engine initialization.
|
||||||
auto rateLimiter = rocksdb::NewGenericRateLimiter((int64_t)5 << 30, // 5GB
|
auto rateLimiter = rocksdb::NewGenericRateLimiter((int64_t)5 << 30, // 5GB
|
||||||
|
@ -961,8 +961,6 @@ public:
|
||||||
descriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, cfOptions));
|
descriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, cfOptions));
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(foundMetadata || descriptors.size() == 0);
|
|
||||||
|
|
||||||
// Add default column family if it's a newly opened database.
|
// Add default column family if it's a newly opened database.
|
||||||
if (descriptors.size() == 0) {
|
if (descriptors.size() == 0) {
|
||||||
descriptors.push_back(rocksdb::ColumnFamilyDescriptor("default", cfOptions));
|
descriptors.push_back(rocksdb::ColumnFamilyDescriptor("default", cfOptions));
|
||||||
|
@ -987,8 +985,7 @@ public:
|
||||||
}
|
}
|
||||||
physicalShards[shard->id] = shard;
|
physicalShards[shard->id] = shard;
|
||||||
columnFamilyMap[handle->GetID()] = handle;
|
columnFamilyMap[handle->GetID()] = handle;
|
||||||
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId)
|
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("ShardId", shard->id);
|
||||||
.detail("PhysicalShardID", shard->id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
|
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
|
||||||
|
@ -1014,7 +1011,7 @@ public:
|
||||||
metadata[i + 1].key.removePrefix(shardMappingPrefix));
|
metadata[i + 1].key.removePrefix(shardMappingPrefix));
|
||||||
TraceEvent(SevVerbose, "DecodeShardMapping", this->logId)
|
TraceEvent(SevVerbose, "DecodeShardMapping", this->logId)
|
||||||
.detail("Range", range)
|
.detail("Range", range)
|
||||||
.detail("Name", name);
|
.detail("ShardId", name);
|
||||||
|
|
||||||
// Empty name indicates the shard doesn't belong to the SS/KVS.
|
// Empty name indicates the shard doesn't belong to the SS/KVS.
|
||||||
if (name.empty()) {
|
if (name.empty()) {
|
||||||
|
@ -1050,13 +1047,13 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& name : unusedShards) {
|
for (const auto& name : unusedShards) {
|
||||||
TraceEvent(SevDebug, "UnusedShardName", logId).detail("Name", name);
|
|
||||||
auto it = physicalShards.find(name);
|
auto it = physicalShards.find(name);
|
||||||
ASSERT(it != physicalShards.end());
|
ASSERT(it != physicalShards.end());
|
||||||
auto shard = it->second;
|
auto shard = it->second;
|
||||||
if (shard->dataShards.size() == 0) {
|
if (shard->dataShards.size() == 0) {
|
||||||
shard->deleteTimeSec = now();
|
shard->deleteTimeSec = now();
|
||||||
pendingDeletionShards.push_back(name);
|
pendingDeletionShards.push_back(name);
|
||||||
|
TraceEvent(SevInfo, "UnusedPhysicalShard", logId).detail("ShardId", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (unusedShards.size() > 0) {
|
if (unusedShards.size() > 0) {
|
||||||
|
@ -1101,7 +1098,7 @@ public:
|
||||||
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
||||||
dbOptions.rate_limiter->SetBytesPerSecond(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC);
|
dbOptions.rate_limiter->SetBytesPerSecond(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC);
|
||||||
}
|
}
|
||||||
TraceEvent(SevInfo, "ShardedRocksShardManagerInitEnd", this->logId)
|
TraceEvent(SevInfo, "ShardedRocksDBInitEnd", this->logId)
|
||||||
.detail("DataPath", path)
|
.detail("DataPath", path)
|
||||||
.detail("Duration", now() - start);
|
.detail("Duration", now() - start);
|
||||||
return status;
|
return status;
|
||||||
|
@ -1171,9 +1168,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
PhysicalShard* addRange(KeyRange range, std::string id) {
|
PhysicalShard* addRange(KeyRange range, std::string id) {
|
||||||
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId)
|
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId).detail("Range", range).detail("ShardId", id);
|
||||||
.detail("Range", range)
|
|
||||||
.detail("PhysicalShardID", id);
|
|
||||||
|
|
||||||
// Newly added range should not overlap with any existing range.
|
// Newly added range should not overlap with any existing range.
|
||||||
auto ranges = dataShardMap.intersectingRanges(range);
|
auto ranges = dataShardMap.intersectingRanges(range);
|
||||||
|
@ -1210,15 +1205,13 @@ public:
|
||||||
|
|
||||||
validate();
|
validate();
|
||||||
|
|
||||||
TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId)
|
TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId).detail("Range", range).detail("ShardId", id);
|
||||||
.detail("Range", range)
|
|
||||||
.detail("PhysicalShardID", id);
|
|
||||||
|
|
||||||
return shard.get();
|
return shard.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::string> removeRange(KeyRange range) {
|
std::vector<std::string> removeRange(KeyRange range) {
|
||||||
TraceEvent(SevInfo, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
|
TraceEvent(SevVerbose, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
|
||||||
std::vector<std::string> shardIds;
|
std::vector<std::string> shardIds;
|
||||||
|
|
||||||
std::vector<DataShard*> newShards;
|
std::vector<DataShard*> newShards;
|
||||||
|
@ -1241,7 +1234,7 @@ public:
|
||||||
auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult);
|
auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult);
|
||||||
if (bytesRead > 0) {
|
if (bytesRead > 0) {
|
||||||
TraceEvent(SevError, "ShardedRocksDBRangeNotEmpty")
|
TraceEvent(SevError, "ShardedRocksDBRangeNotEmpty")
|
||||||
.detail("PhysicalShard", existingShard->toString())
|
.detail("ShardId", existingShard->toString())
|
||||||
.detail("Range", range)
|
.detail("Range", range)
|
||||||
.detail("DataShardRange", shardRange);
|
.detail("DataShardRange", shardRange);
|
||||||
// Force clear range.
|
// Force clear range.
|
||||||
|
@ -1249,13 +1242,6 @@ public:
|
||||||
dirtyShards->insert(it.value()->physicalShard);
|
dirtyShards->insert(it.value()->physicalShard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent(SevDebug, "ShardedRocksRemoveRange")
|
|
||||||
.detail("Range", range)
|
|
||||||
.detail("IntersectingRange", shardRange)
|
|
||||||
.detail("DataShardRange", it.value()->range)
|
|
||||||
.detail("PhysicalShard", existingShard->toString());
|
|
||||||
|
|
||||||
ASSERT(it.value()->range == shardRange); // Ranges should be consistent.
|
ASSERT(it.value()->range == shardRange); // Ranges should be consistent.
|
||||||
|
|
||||||
if (range.contains(shardRange)) {
|
if (range.contains(shardRange)) {
|
||||||
|
@ -1263,9 +1249,9 @@ public:
|
||||||
TraceEvent(SevInfo, "ShardedRocksRemovedRange")
|
TraceEvent(SevInfo, "ShardedRocksRemovedRange")
|
||||||
.detail("Range", range)
|
.detail("Range", range)
|
||||||
.detail("RemovedRange", shardRange)
|
.detail("RemovedRange", shardRange)
|
||||||
.detail("PhysicalShard", existingShard->toString());
|
.detail("ShardId", existingShard->toString());
|
||||||
if (existingShard->dataShards.size() == 0) {
|
if (existingShard->dataShards.size() == 0) {
|
||||||
TraceEvent(SevDebug, "ShardedRocksDB").detail("EmptyShardId", existingShard->id);
|
TraceEvent(SevInfo, "ShardedRocksDBEmptyShard").detail("ShardId", existingShard->id);
|
||||||
shardIds.push_back(existingShard->id);
|
shardIds.push_back(existingShard->id);
|
||||||
existingShard->deleteTimeSec = now();
|
existingShard->deleteTimeSec = now();
|
||||||
pendingDeletionShards.push_back(existingShard->id);
|
pendingDeletionShards.push_back(existingShard->id);
|
||||||
|
@ -1440,7 +1426,7 @@ public:
|
||||||
.detail("Action", "PersistRangeMapping")
|
.detail("Action", "PersistRangeMapping")
|
||||||
.detail("BeginKey", it.range().begin)
|
.detail("BeginKey", it.range().begin)
|
||||||
.detail("EndKey", it.range().end)
|
.detail("EndKey", it.range().end)
|
||||||
.detail("PhysicalShardID", it.value()->physicalShard->id);
|
.detail("ShardId", it.value()->physicalShard->id);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// Empty range.
|
// Empty range.
|
||||||
|
@ -1449,7 +1435,7 @@ public:
|
||||||
.detail("Action", "PersistRangeMapping")
|
.detail("Action", "PersistRangeMapping")
|
||||||
.detail("BeginKey", it.range().begin)
|
.detail("BeginKey", it.range().begin)
|
||||||
.detail("EndKey", it.range().end)
|
.detail("EndKey", it.range().end)
|
||||||
.detail("PhysicalShardID", "None");
|
.detail("ShardId", "None");
|
||||||
}
|
}
|
||||||
lastKey = it.range().end;
|
lastKey = it.range().end;
|
||||||
}
|
}
|
||||||
|
@ -1524,9 +1510,6 @@ public:
|
||||||
dbOptions.rate_limiter->SetBytesPerSecond((int64_t)5 << 30);
|
dbOptions.rate_limiter->SetBytesPerSecond((int64_t)5 << 30);
|
||||||
}
|
}
|
||||||
columnFamilyMap.clear();
|
columnFamilyMap.clear();
|
||||||
for (auto& [_, shard] : physicalShards) {
|
|
||||||
shard->deletePending = true;
|
|
||||||
}
|
|
||||||
physicalShards.clear();
|
physicalShards.clear();
|
||||||
// Close DB.
|
// Close DB.
|
||||||
auto s = db->Close();
|
auto s = db->Close();
|
||||||
|
@ -3240,6 +3223,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent(SevError, "ShardedRocksCloseReadThreadError").errorUnsuppressed(e);
|
TraceEvent(SevError, "ShardedRocksCloseReadThreadError").errorUnsuppressed(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceEvent("CloseKeyValueStore").detail("DeleteKVS", deleteOnClose);
|
||||||
auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose);
|
auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose);
|
||||||
auto f = a->done.getFuture();
|
auto f = a->done.getFuture();
|
||||||
self->writeThread->post(a);
|
self->writeThread->post(a);
|
||||||
|
@ -3663,6 +3648,7 @@ TEST_CASE("noSim/ShardedRocksDB/Initialization") {
|
||||||
Future<Void> closed = kvStore->onClosed();
|
Future<Void> closed = kvStore->onClosed();
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3691,6 +3677,7 @@ TEST_CASE("noSim/ShardedRocksDB/SingleShardRead") {
|
||||||
Future<Void> closed = kvStore->onClosed();
|
Future<Void> closed = kvStore->onClosed();
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3849,7 +3836,7 @@ TEST_CASE("noSim/ShardedRocksDB/RangeOps") {
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
}
|
}
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3956,6 +3943,7 @@ TEST_CASE("noSim/ShardedRocksDB/ShardOps") {
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
}
|
}
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4105,7 +4093,7 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
}
|
}
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4413,6 +4401,7 @@ TEST_CASE("perf/ShardedRocksDB/RangeClearSysKey") {
|
||||||
Future<Void> closed = kvStore->onClosed();
|
Future<Void> closed = kvStore->onClosed();
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4473,6 +4462,7 @@ TEST_CASE("perf/ShardedRocksDB/RangeClearUserKey") {
|
||||||
Future<Void> closed = kvStore->onClosed();
|
Future<Void> closed = kvStore->onClosed();
|
||||||
kvStore->dispose();
|
kvStore->dispose();
|
||||||
wait(closed);
|
wait(closed);
|
||||||
|
ASSERT(!directoryExists(rocksDBTestDir));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
|
@ -0,0 +1,185 @@
|
||||||
|
/*
|
||||||
|
* TransactionTagCounter.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
#include "fdbserver/Knobs.h"
|
||||||
|
#include "fdbserver/TransactionTagCounter.h"
|
||||||
|
#include "flow/Trace.h"
|
||||||
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
|
class TransactionTagCounterImpl {
|
||||||
|
UID thisServerID;
|
||||||
|
TransactionTagMap<double> intervalCosts;
|
||||||
|
double intervalTotalCost = 0;
|
||||||
|
double intervalStart = 0;
|
||||||
|
int maxTagsTracked;
|
||||||
|
double minRateTracked;
|
||||||
|
|
||||||
|
std::vector<BusyTagInfo> previousBusiestTags;
|
||||||
|
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||||
|
|
||||||
|
std::vector<BusyTagInfo> getBusiestTagsFromLastInterval(double elapsed) const {
|
||||||
|
std::priority_queue<BusyTagInfo, std::vector<BusyTagInfo>, std::greater<BusyTagInfo>> topKTags;
|
||||||
|
for (auto const& [tag, cost] : intervalCosts) {
|
||||||
|
auto const rate = cost / elapsed;
|
||||||
|
auto const fractionalBusyness = std::min(1.0, cost / intervalTotalCost);
|
||||||
|
if (rate < minRateTracked) {
|
||||||
|
continue;
|
||||||
|
} else if (topKTags.size() < maxTagsTracked) {
|
||||||
|
topKTags.emplace(tag, rate, fractionalBusyness);
|
||||||
|
} else if (topKTags.top().rate < rate) {
|
||||||
|
topKTags.pop();
|
||||||
|
topKTags.emplace(tag, rate, fractionalBusyness);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::vector<BusyTagInfo> result;
|
||||||
|
while (!topKTags.empty()) {
|
||||||
|
result.push_back(std::move(topKTags.top()));
|
||||||
|
topKTags.pop();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
TransactionTagCounterImpl(UID thisServerID, int maxTagsTracked, double minRateTracked)
|
||||||
|
: thisServerID(thisServerID), maxTagsTracked(maxTagsTracked), minRateTracked(minRateTracked),
|
||||||
|
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
|
||||||
|
|
||||||
|
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||||
|
auto const cost = getReadOperationCost(bytes);
|
||||||
|
intervalTotalCost += cost;
|
||||||
|
if (tags.present()) {
|
||||||
|
for (auto const& tag : tags.get()) {
|
||||||
|
CODE_PROBE(true, "Tracking transaction tag in TransactionTagCounter");
|
||||||
|
intervalCosts[TransactionTag(tag, tags.get().getArena())] += cost / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void startNewInterval() {
|
||||||
|
double elapsed = now() - intervalStart;
|
||||||
|
previousBusiestTags.clear();
|
||||||
|
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||||
|
previousBusiestTags = getBusiestTagsFromLastInterval(elapsed);
|
||||||
|
|
||||||
|
// For status, report the busiest tag:
|
||||||
|
if (previousBusiestTags.empty()) {
|
||||||
|
TraceEvent("BusiestReadTag", thisServerID).detail("TagCost", 0.0);
|
||||||
|
} else {
|
||||||
|
auto busiestTagInfo = previousBusiestTags[0];
|
||||||
|
for (int i = 1; i < previousBusiestTags.size(); ++i) {
|
||||||
|
auto const& tagInfo = previousBusiestTags[i];
|
||||||
|
if (tagInfo.rate > busiestTagInfo.rate) {
|
||||||
|
busiestTagInfo = tagInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TraceEvent("BusiestReadTag", thisServerID)
|
||||||
|
.detail("Tag", printable(busiestTagInfo.tag))
|
||||||
|
.detail("TagCost", busiestTagInfo.rate)
|
||||||
|
.detail("FractionalBusyness", busiestTagInfo.fractionalBusyness);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& tagInfo : previousBusiestTags) {
|
||||||
|
TraceEvent("BusyReadTag", thisServerID)
|
||||||
|
.detail("Tag", printable(tagInfo.tag))
|
||||||
|
.detail("TagCost", tagInfo.rate)
|
||||||
|
.detail("FractionalBusyness", tagInfo.fractionalBusyness);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
intervalCosts.clear();
|
||||||
|
intervalTotalCost = 0;
|
||||||
|
intervalStart = now();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<BusyTagInfo> const& getBusiestTags() const { return previousBusiestTags; }
|
||||||
|
};
|
||||||
|
|
||||||
|
TransactionTagCounter::TransactionTagCounter(UID thisServerID, int maxTagsTracked, double minRateTracked)
|
||||||
|
: impl(PImpl<TransactionTagCounterImpl>::create(thisServerID, maxTagsTracked, minRateTracked)) {}
|
||||||
|
|
||||||
|
TransactionTagCounter::~TransactionTagCounter() = default;
|
||||||
|
|
||||||
|
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||||
|
return impl->addRequest(tags, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TransactionTagCounter::startNewInterval() {
|
||||||
|
return impl->startNewInterval();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<BusyTagInfo> const& TransactionTagCounter::getBusiestTags() const {
|
||||||
|
return impl->getBusiestTags();
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
bool containsTag(std::vector<BusyTagInfo> const& busyTags, TransactionTagRef tag) {
|
||||||
|
return std::count_if(busyTags.begin(), busyTags.end(), [tag](auto const& tagInfo) { return tagInfo.tag == tag; }) ==
|
||||||
|
1;
|
||||||
|
}
|
||||||
|
|
||||||
|
TagSet getTagSet(TransactionTagRef tag) {
|
||||||
|
TagSet result;
|
||||||
|
result.addTag(tag);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST_CASE("/fdbserver/TransactionTagCounter/IgnoreBeyondMaxTags") {
|
||||||
|
state TransactionTagCounter counter(UID(),
|
||||||
|
/*maxTagsTracked=*/2,
|
||||||
|
/*minRateTracked=*/10.0 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE /
|
||||||
|
CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||||
|
counter.startNewInterval();
|
||||||
|
ASSERT_EQ(counter.getBusiestTags().size(), 0);
|
||||||
|
{
|
||||||
|
wait(delay(1.0));
|
||||||
|
counter.addRequest(getTagSet("tagA"_sr), 10 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE);
|
||||||
|
counter.addRequest(getTagSet("tagA"_sr), 10 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE);
|
||||||
|
counter.addRequest(getTagSet("tagB"_sr), 15 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE);
|
||||||
|
counter.addRequest(getTagSet("tagC"_sr), 20 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE);
|
||||||
|
counter.startNewInterval();
|
||||||
|
auto const busiestTags = counter.getBusiestTags();
|
||||||
|
ASSERT_EQ(busiestTags.size(), 2);
|
||||||
|
ASSERT(containsTag(busiestTags, "tagA"_sr));
|
||||||
|
ASSERT(!containsTag(busiestTags, "tagB"_sr));
|
||||||
|
ASSERT(containsTag(busiestTags, "tagC"_sr));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("/fdbserver/TransactionTagCounter/IgnoreBelowMinRate") {
|
||||||
|
state TransactionTagCounter counter(UID(),
|
||||||
|
/*maxTagsTracked=*/2,
|
||||||
|
/*minRateTracked=*/10.0 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE /
|
||||||
|
CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||||
|
counter.startNewInterval();
|
||||||
|
ASSERT_EQ(counter.getBusiestTags().size(), 0);
|
||||||
|
{
|
||||||
|
wait(delay(1.0));
|
||||||
|
counter.addRequest(getTagSet("tagA"_sr), 5 * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE);
|
||||||
|
counter.startNewInterval();
|
||||||
|
auto const busiestTags = counter.getBusiestTags();
|
||||||
|
ASSERT_EQ(busiestTags.size(), 0);
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
|
@ -1,227 +0,0 @@
|
||||||
/*
|
|
||||||
* TransactionTagCounter.cpp
|
|
||||||
*
|
|
||||||
* This source file is part of the FoundationDB open source project
|
|
||||||
*
|
|
||||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
|
||||||
#include "fdbserver/Knobs.h"
|
|
||||||
#include "fdbserver/TransactionTagCounter.h"
|
|
||||||
#include "flow/Trace.h"
|
|
||||||
|
|
||||||
namespace {
|
|
||||||
|
|
||||||
class TopKTags {
|
|
||||||
public:
|
|
||||||
struct TagAndCount {
|
|
||||||
TransactionTag tag;
|
|
||||||
int64_t count;
|
|
||||||
bool operator<(TagAndCount const& other) const { return count < other.count; }
|
|
||||||
explicit TagAndCount(TransactionTag tag, int64_t count) : tag(tag), count(count) {}
|
|
||||||
};
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Because the number of tracked is expected to be small, they can be tracked
|
|
||||||
// in a simple vector. If the number of tracked tags increases, a more sophisticated
|
|
||||||
// data structure will be required.
|
|
||||||
std::vector<TagAndCount> topTags;
|
|
||||||
int limit;
|
|
||||||
|
|
||||||
public:
|
|
||||||
explicit TopKTags(int limit) : limit(limit) {
|
|
||||||
ASSERT_GT(limit, 0);
|
|
||||||
topTags.reserve(limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
void incrementCount(TransactionTag tag, int previousCount, int increase) {
|
|
||||||
auto iter = std::find_if(topTags.begin(), topTags.end(), [tag](const auto& tc) { return tc.tag == tag; });
|
|
||||||
if (iter != topTags.end()) {
|
|
||||||
ASSERT_EQ(previousCount, iter->count);
|
|
||||||
iter->count += increase;
|
|
||||||
} else if (topTags.size() < limit) {
|
|
||||||
ASSERT_EQ(previousCount, 0);
|
|
||||||
topTags.emplace_back(tag, increase);
|
|
||||||
} else {
|
|
||||||
auto toReplace = std::min_element(topTags.begin(), topTags.end());
|
|
||||||
ASSERT_GE(toReplace->count, previousCount);
|
|
||||||
if (toReplace->count < previousCount + increase) {
|
|
||||||
toReplace->tag = tag;
|
|
||||||
toReplace->count = previousCount + increase;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> getBusiestTags(double elapsed, double totalSampleCount) const {
|
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> result;
|
|
||||||
for (auto const& tagAndCounter : topTags) {
|
|
||||||
auto rate = (tagAndCounter.count / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) / elapsed;
|
|
||||||
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE) {
|
|
||||||
result.emplace_back(tagAndCounter.tag, rate, tagAndCounter.count / totalSampleCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
void clear() { topTags.clear(); }
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
class TransactionTagCounterImpl {
|
|
||||||
UID thisServerID;
|
|
||||||
TransactionTagMap<int64_t> intervalCounts;
|
|
||||||
int64_t intervalTotalSampledCount = 0;
|
|
||||||
TopKTags topTags;
|
|
||||||
double intervalStart = 0;
|
|
||||||
|
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
|
||||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
|
||||||
|
|
||||||
public:
|
|
||||||
TransactionTagCounterImpl(UID thisServerID)
|
|
||||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
|
||||||
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
|
|
||||||
|
|
||||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
|
||||||
if (tags.present()) {
|
|
||||||
CODE_PROBE(true, "Tracking transaction tag in counter");
|
|
||||||
auto const cost = getReadOperationCost(bytes);
|
|
||||||
for (auto& tag : tags.get()) {
|
|
||||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
|
||||||
topTags.incrementCount(tag, count, cost);
|
|
||||||
count += cost;
|
|
||||||
}
|
|
||||||
|
|
||||||
intervalTotalSampledCount += cost;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void startNewInterval() {
|
|
||||||
double elapsed = now() - intervalStart;
|
|
||||||
previousBusiestTags.clear();
|
|
||||||
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
|
||||||
previousBusiestTags = topTags.getBusiestTags(elapsed, intervalTotalSampledCount);
|
|
||||||
|
|
||||||
// For status, report the busiest tag:
|
|
||||||
if (previousBusiestTags.empty()) {
|
|
||||||
TraceEvent("BusiestReadTag", thisServerID).detail("TagCost", 0.0);
|
|
||||||
} else {
|
|
||||||
auto busiestTagInfo = previousBusiestTags[0];
|
|
||||||
for (int i = 1; i < previousBusiestTags.size(); ++i) {
|
|
||||||
auto const& tagInfo = previousBusiestTags[i];
|
|
||||||
if (tagInfo.rate > busiestTagInfo.rate) {
|
|
||||||
busiestTagInfo = tagInfo;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TraceEvent("BusiestReadTag", thisServerID)
|
|
||||||
.detail("Tag", busiestTagInfo.tag)
|
|
||||||
.detail("TagCost", busiestTagInfo.rate)
|
|
||||||
.detail("FractionalBusyness", busiestTagInfo.fractionalBusyness);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto& tagInfo : previousBusiestTags) {
|
|
||||||
TraceEvent("BusyReadTag", thisServerID)
|
|
||||||
.detail("Tag", tagInfo.tag)
|
|
||||||
.detail("TagCost", tagInfo.rate)
|
|
||||||
.detail("FractionalBusyness", tagInfo.fractionalBusyness);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
intervalCounts.clear();
|
|
||||||
intervalTotalSampledCount = 0;
|
|
||||||
topTags.clear();
|
|
||||||
intervalStart = now();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
|
|
||||||
};
|
|
||||||
|
|
||||||
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
|
|
||||||
: impl(PImpl<TransactionTagCounterImpl>::create(thisServerID)) {}
|
|
||||||
|
|
||||||
TransactionTagCounter::~TransactionTagCounter() = default;
|
|
||||||
|
|
||||||
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
|
||||||
return impl->addRequest(tags, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TransactionTagCounter::startNewInterval() {
|
|
||||||
return impl->startNewInterval();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& TransactionTagCounter::getBusiestTags() const {
|
|
||||||
return impl->getBusiestTags();
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_CASE("/TransactionTagCounter/TopKTags") {
|
|
||||||
TopKTags topTags(2);
|
|
||||||
|
|
||||||
// Ensure that costs are larger enough to show up
|
|
||||||
auto const costMultiplier =
|
|
||||||
std::max<double>(1.0,
|
|
||||||
2 * SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE *
|
|
||||||
CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
|
||||||
|
|
||||||
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
|
|
||||||
topTags.incrementCount("a"_sr, 0, 1 * costMultiplier);
|
|
||||||
{
|
|
||||||
auto const busiestTags = topTags.getBusiestTags(1.0, 1 * costMultiplier);
|
|
||||||
ASSERT_EQ(busiestTags.size(), 1);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
|
||||||
1);
|
|
||||||
}
|
|
||||||
topTags.incrementCount("b"_sr, 0, 2 * costMultiplier);
|
|
||||||
topTags.incrementCount("c"_sr, 0, 3 * costMultiplier);
|
|
||||||
{
|
|
||||||
auto busiestTags = topTags.getBusiestTags(1.0, 6 * costMultiplier);
|
|
||||||
ASSERT_EQ(busiestTags.size(), 2);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
|
||||||
0);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
|
|
||||||
1);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
|
|
||||||
1);
|
|
||||||
}
|
|
||||||
topTags.incrementCount("a"_sr, 1 * costMultiplier, 3 * costMultiplier);
|
|
||||||
{
|
|
||||||
auto busiestTags = topTags.getBusiestTags(1.0, 9 * costMultiplier);
|
|
||||||
ASSERT_EQ(busiestTags.size(), 2);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
|
||||||
1);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
|
|
||||||
0);
|
|
||||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
|
||||||
busiestTags.end(),
|
|
||||||
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
|
|
||||||
1);
|
|
||||||
}
|
|
||||||
topTags.clear();
|
|
||||||
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
|
|
||||||
return Void();
|
|
||||||
}
|
|
|
@ -72,7 +72,7 @@ public:
|
||||||
StorageQueuingMetricsReply lastReply;
|
StorageQueuingMetricsReply lastReply;
|
||||||
bool acceptingRequests;
|
bool acceptingRequests;
|
||||||
limitReason_t limitReason;
|
limitReason_t limitReason;
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> busiestReadTags, busiestWriteTags;
|
std::vector<BusyTagInfo> busiestReadTags, busiestWriteTags;
|
||||||
|
|
||||||
StorageQueueInfo(const UID& id, const LocalityData& locality);
|
StorageQueueInfo(const UID& id, const LocalityData& locality);
|
||||||
StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality);
|
StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality);
|
||||||
|
|
|
@ -28,7 +28,7 @@ class TransactionTagCounter {
|
||||||
PImpl<class TransactionTagCounterImpl> impl;
|
PImpl<class TransactionTagCounterImpl> impl;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
TransactionTagCounter(UID thisServerID);
|
TransactionTagCounter(UID thisServerID, int maxTagsTracked, double minRateTracked);
|
||||||
~TransactionTagCounter();
|
~TransactionTagCounter();
|
||||||
|
|
||||||
// Update counters tracking the busyness of each tag in the current interval
|
// Update counters tracking the busyness of each tag in the current interval
|
||||||
|
@ -38,5 +38,5 @@ public:
|
||||||
void startNewInterval();
|
void startNewInterval();
|
||||||
|
|
||||||
// Returns the set of busiest tags as of the end of the last interval
|
// Returns the set of busiest tags as of the end of the last interval
|
||||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const;
|
std::vector<BusyTagInfo> const& getBusiestTags() const;
|
||||||
};
|
};
|
||||||
|
|
|
@ -1641,7 +1641,11 @@ public:
|
||||||
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
|
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
|
||||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
|
lastDurableVersionEBrake(0), maxQueryQueue(0),
|
||||||
|
transactionTagCounter(ssi.id(),
|
||||||
|
/*maxTagsTracked=*/SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED,
|
||||||
|
/*minRateTracked=*/SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE *
|
||||||
|
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE),
|
||||||
busiestWriteTagContext(ssi.id()), counters(this),
|
busiestWriteTagContext(ssi.id()), counters(this),
|
||||||
storageServerSourceTLogIDEventHolder(
|
storageServerSourceTLogIDEventHolder(
|
||||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")),
|
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")),
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
* RestoreMultiRanges.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/FDBTypes.h"
|
||||||
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
|
#include "fdbrpc/simulator.h"
|
||||||
|
#include "fdbclient/BackupAgent.actor.h"
|
||||||
|
#include "fdbclient/BackupContainer.h"
|
||||||
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
|
struct RestoreMultiRangesWorkload : TestWorkload {
|
||||||
|
|
||||||
|
FileBackupAgent backupAgent;
|
||||||
|
Reference<IBackupContainer> backupContainer;
|
||||||
|
|
||||||
|
RestoreMultiRangesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
|
||||||
|
|
||||||
|
static constexpr const char* NAME = "RestoreMultiRanges";
|
||||||
|
|
||||||
|
ACTOR static Future<Void> clearDatabase(Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.clear(normalKeys);
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> prepareDatabase(Database cx) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.reset();
|
||||||
|
tr.set("a"_sr, "a"_sr);
|
||||||
|
tr.set("aaaa"_sr, "aaaa"_sr);
|
||||||
|
tr.set("b"_sr, "b"_sr);
|
||||||
|
tr.set("bb"_sr, "bb"_sr);
|
||||||
|
tr.set("bbb"_sr, "bbb"_sr);
|
||||||
|
wait(tr.commit());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void logTestData(const VectorRef<KeyValueRef>& data) {
|
||||||
|
TraceEvent("TestFailureDetail").log();
|
||||||
|
int index = 0;
|
||||||
|
for (auto& entry : data) {
|
||||||
|
TraceEvent("CurrentDataEntry")
|
||||||
|
.detail("Index", index)
|
||||||
|
.detail("Key", entry.key.toString())
|
||||||
|
.detail("Value", entry.value.toString());
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<bool> verifyDatabase(Database cx) {
|
||||||
|
state UID randomID = nondeterministicRandom()->randomUniqueID();
|
||||||
|
TraceEvent("RestoreMultiRanges_Verify").detail("UID", randomID);
|
||||||
|
state Transaction tr(cx);
|
||||||
|
state KeyRangeRef range("a"_sr, "z"_sr);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.reset();
|
||||||
|
tr.debugTransaction(randomID);
|
||||||
|
RangeResult kvs = wait(tr.getRange(range, 10));
|
||||||
|
if (kvs.size() != 4) {
|
||||||
|
logTestData(kvs);
|
||||||
|
TraceEvent(SevError, "TestFailureInfo")
|
||||||
|
.detail("DataSize", kvs.size())
|
||||||
|
.detail("Expect", 4)
|
||||||
|
.detail("Workload", NAME);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
KeyRef keys[4] = { "a"_sr, "aaaa"_sr, "bb"_sr, "bbb"_sr };
|
||||||
|
for (size_t i = 0; i < 4; ++i) {
|
||||||
|
if (kvs[i].key != keys[i]) {
|
||||||
|
TraceEvent(SevError, "TestFailureInfo")
|
||||||
|
.detail("ExpectKey", keys[i])
|
||||||
|
.detail("Got", kvs[i].key)
|
||||||
|
.detail("Index", i);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TraceEvent("RestoreMultiRanges_VerifyPassed");
|
||||||
|
return true;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> _start(RestoreMultiRangesWorkload* self, Database cx) {
|
||||||
|
TraceEvent("RestoreMultiRanges_StartBackup");
|
||||||
|
wait(clearDatabase(cx));
|
||||||
|
wait(prepareDatabase(cx));
|
||||||
|
|
||||||
|
state std::string backupContainer = "file://simfdb/backups/";
|
||||||
|
state std::string tagName = "default";
|
||||||
|
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||||
|
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef("a"_sr, "z"_sr));
|
||||||
|
TraceEvent("RestoreMultiRanges_SubmitBackup");
|
||||||
|
try {
|
||||||
|
wait(self->backupAgent.submitBackup(cx,
|
||||||
|
StringRef(backupContainer),
|
||||||
|
{},
|
||||||
|
deterministicRandom()->randomInt(0, 60),
|
||||||
|
deterministicRandom()->randomInt(0, 100),
|
||||||
|
tagName,
|
||||||
|
backupRanges,
|
||||||
|
true,
|
||||||
|
StopWhenDone::True));
|
||||||
|
} catch (Error& e) {
|
||||||
|
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("RestoreMultiRanges_WaitBackup");
|
||||||
|
state Reference<IBackupContainer> container;
|
||||||
|
wait(success(self->backupAgent.waitBackup(cx, tagName, StopWhenDone::True, &container)));
|
||||||
|
|
||||||
|
TraceEvent("RestoreMultiRanges_ClearDatabase");
|
||||||
|
wait(clearDatabase(cx));
|
||||||
|
|
||||||
|
TraceEvent("RestoreMultiRanges_Restore");
|
||||||
|
state Standalone<VectorRef<KeyRangeRef>> ranges;
|
||||||
|
ranges.push_back_deep(ranges.arena(), KeyRangeRef("a"_sr, "aaaaa"_sr));
|
||||||
|
ranges.push_back_deep(ranges.arena(), KeyRangeRef("bb"_sr, "bbbbb"_sr)); // Skip "b"
|
||||||
|
wait(success(self->backupAgent.restore(cx,
|
||||||
|
cx,
|
||||||
|
Key(tagName),
|
||||||
|
Key(container->getURL()),
|
||||||
|
{},
|
||||||
|
ranges,
|
||||||
|
WaitForComplete::True,
|
||||||
|
::invalidVersion,
|
||||||
|
Verbose::True)));
|
||||||
|
TraceEvent("RestoreMultiRanges_Success");
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||||
|
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(this, cx); }
|
||||||
|
Future<bool> check(Database const& cx) override { return verifyDatabase(cx); }
|
||||||
|
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkloadFactory<RestoreMultiRangesWorkload> RestoreMultiRangesWorkloadFactory;
|
|
@ -272,6 +272,7 @@ if(WITH_PYTHON)
|
||||||
add_fdb_test(TEST_FILES rare/RYWDisable.toml)
|
add_fdb_test(TEST_FILES rare/RYWDisable.toml)
|
||||||
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.toml)
|
||||||
add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml)
|
add_fdb_test(TEST_FILES rare/ReadSkewReadWrite.toml)
|
||||||
|
add_fdb_test(TEST_FILES rare/RestoreMultiRanges.toml)
|
||||||
add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml)
|
add_fdb_test(TEST_FILES rare/SpecificUnitTests.toml)
|
||||||
add_fdb_test(TEST_FILES rare/StorageQuotaTest.toml)
|
add_fdb_test(TEST_FILES rare/StorageQuotaTest.toml)
|
||||||
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml)
|
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.toml)
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
[configuration]
|
||||||
|
tenantModes = ['disabled']
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
testTitle = 'RestoreMultiRanges'
|
||||||
|
clearAfterTest = true
|
||||||
|
simBackupAgents = 'BackupToFile'
|
||||||
|
|
||||||
|
[[test.workload]]
|
||||||
|
testName = 'RestoreMultiRanges'
|
||||||
|
|
Loading…
Reference in New Issue