Merge master branch and resolve conflicts

This commit is contained in:
Young Liu 2020-08-24 16:42:31 -07:00
commit 63b3612ad5
30 changed files with 794 additions and 369 deletions

View File

@ -215,6 +215,9 @@ else()
if (USE_AVX512F)
if (CMAKE_HOST_SYSTEM_PROCESSOR MATCHES "^x86")
add_compile_options(-mavx512f)
elseif(USE_VALGRIND)
message(STATUS "USE_VALGRIND=ON make USE_AVX OFF to satisfy valgrind analysis requirement")
set(USE_AVX512F OFF)
else()
message(STATUS "USE_AVX512F is supported on x86 or x86_64 only")
set(USE_AVX512F OFF)
@ -224,6 +227,9 @@ else()
if (USE_AVX)
if (CMAKE_HOST_SYSTEM_PROCESSOR MATCHES "^x86")
add_compile_options(-mavx)
elseif(USE_VALGRIND)
message(STATUS "USE_VALGRIND=ON make USE_AVX OFF to satisfy valgrind analysis requirement")
set(USE_AVX OFF)
else()
message(STATUS "USE_AVX is supported on x86 or x86_64 only")
set(USE_AVX OFF)

View File

@ -80,8 +80,7 @@ namespace Magnesium
TraceFile = file,
DDetails = xEvent.Elements()
.Where(a=>a.Name != "Type" && a.Name != "Time" && a.Name != "Machine" && a.Name != "ID" && a.Name != "Severity" && (!rolledEvent || a.Name != "OriginalTime"))
// When the key contains a colon character, it gets parsed as a:item
.ToDictionary(a=>a.Name.LocalName == "item" ? a.Attribute("item").Value : string.Intern(a.Name.LocalName), a=>(object)a.Value),
.ToDictionary(a=>string.Intern(a.Name.LocalName), a=>(object)a.Value),
original = keepOriginalElement ? xEvent : null
};
}

View File

@ -456,16 +456,20 @@ disable
``throttle disable auto``
Disables cluster auto-throttling for busy transaction tags. This does not disable any currently active throttles. To do so, run the following command after disabling auto-throttling::
> throttle off auto
Disables cluster auto-throttling for busy transaction tags. This may not disable currently active throttles immediately, seconds of delay is expected.
list
^^^^
``throttle list [LIMIT]``
``throttle list [throttled|recommended|all] [LIMIT]``
Prints a list of currently active transaction tag throttles.
Prints a list of currently active transaction tag throttles, or recommended transaction tag throttles if auto-throttling is disabled.
``throttled`` - list active transaction tag throttles.
``recommended`` - list transaction tag throttles recommended by the ratekeeper, but not active yet.
``all`` - list both active and recommended transaction tag throttles.
``LIMIT`` - The number of throttles to print. Defaults to 100.

View File

@ -315,11 +315,18 @@
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"throttled_tags":{
"auto":{
"count":0
"auto" : {
"busy_read" : 0,
"busy_write" : 0,
"count" : 0
},
"manual":{
"count":0
"manual" : {
"count" : 1
},
"recommend" : {
"busy_read" : 0,
"busy_write" : 0,
"count" : 0
}
},
"limiting_queue_bytes_storage_server":0,

View File

@ -132,3 +132,13 @@ log_server_min_free_space Log server running out of space (approaching
log_server_min_free_space_ratio Log server running out of space (approaching 5% limit).
storage_server_durability_lag Storage server durable version falling behind.
=================================== ====================================================
The JSON path ``cluster.qos.throttled_tags``, when it exists, is an Object containing ``"auto"`` , ``"manual"`` and ``"recommended"``. The possible fields for those object are in the following table:
=================================== ====================================================
Name Description
=================================== ====================================================
count How many tags are throttled
busy_read How many tags are throttled because of busy read
busy_write How many tags are throttled because of busy write
=================================== ====================================================

View File

@ -2572,6 +2572,16 @@ void throttleGenerator(const char* text, const char *line, std::vector<std::stri
const char* opts[] = { "auto", nullptr };
arrayGenerator(text, line, opts, lc);
}
else if(tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if(tokens.size() == 2) {
const char* opts[] = { "throttled", "recommended", "all", nullptr };
arrayGenerator(text, line, opts, lc);
}
else if(tokens.size() == 3) {
const char* opts[] = {"LIMITS", nullptr};
arrayGenerator(text, line, opts, lc);
}
}
}
void fdbcliCompCmd(std::string const& text, std::vector<std::string>& lc) {
@ -2692,6 +2702,14 @@ std::vector<const char*> throttleHintGenerator(std::vector<StringRef> const& tok
else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 2) {
return { "auto" };
}
else if(tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
if(tokens.size() == 2) {
return { "[throttled|recommended|all]", "[LIMITS]" };
}
else if(tokens.size() == 3 && (tokencmp(tokens[2], "throttled") || tokencmp(tokens[2], "recommended") || tokencmp(tokens[2], "all"))){
return {"[LIMITS]"};
}
}
else if(tokens.size() == 2 && inArgument) {
return { "[ARGS]" };
}
@ -4108,8 +4126,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
else if(tokencmp(tokens[1], "list")) {
if(tokens.size() > 3) {
printf("Usage: throttle list [LIMIT]\n");
if(tokens.size() > 4) {
printf("Usage: throttle list [throttled|recommended|all] [LIMIT]\n");
printf("\n");
printf("Lists tags that are currently throttled.\n");
printf("The default LIMIT is 100 tags.\n");
@ -4117,36 +4135,72 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
state int throttleListLimit = 100;
state bool reportThrottled = true;
state bool reportRecommended = false;
if(tokens.size() >= 3) {
char *end;
throttleListLimit = std::strtol((const char*)tokens[2].begin(), &end, 10);
if ((tokens.size() > 3 && !std::isspace(*end)) || (tokens.size() == 3 && *end != '\0')) {
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str());
if(tokencmp(tokens[2], "recommended")) {
reportThrottled = false; reportRecommended = true;
}
else if(tokencmp(tokens[2], "all")){
reportThrottled = true; reportRecommended = true;
}
else if(!tokencmp(tokens[2], "throttled")){
printf("ERROR: failed to parse `%s'.\n", printable(tokens[2]).c_str());
is_error = true;
continue;
}
}
std::vector<TagThrottleInfo> tags = wait(ThrottleApi::getThrottledTags(db, throttleListLimit));
state int throttleListLimit = 100;
if(tokens.size() >= 4) {
char *end;
throttleListLimit = std::strtol((const char*)tokens[3].begin(), &end, 10);
if ((tokens.size() > 4 && !std::isspace(*end)) || (tokens.size() == 4 && *end != '\0')) {
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[3]).c_str());
is_error = true;
continue;
}
}
state std::vector<TagThrottleInfo> tags;
if(reportThrottled && reportRecommended) {
wait(store(tags, ThrottleApi::getThrottledTags(db, throttleListLimit, true)));
}
else if(reportThrottled) {
wait(store(tags, ThrottleApi::getThrottledTags(db, throttleListLimit)));
}
else if(reportRecommended) {
wait(store(tags, ThrottleApi::getRecommendedTags(db, throttleListLimit)));
}
bool anyLogged = false;
for(auto itr = tags.begin(); itr != tags.end(); ++itr) {
if(itr->expirationTime > now()) {
if(!anyLogged) {
printf("Throttled tags:\n\n");
printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Tag\n");
printf(" --------------+----------------+-----------+--------+------------------\n");
printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Reason |Tag\n");
printf(" --------------+----------------+-----------+--------+------------+------\n");
anyLogged = true;
}
printf(" %12d | %13ds | %9s | %6s | %s\n",
(int)(itr->tpsRate),
std::min((int)(itr->expirationTime-now()), (int)(itr->initialDuration)),
transactionPriorityToString(itr->priority, false),
itr->throttleType == TagThrottleType::AUTO ? "auto" : "manual",
itr->tag.toString().c_str());
std::string reasonStr = "unset";
if(itr->reason == TagThrottledReason::MANUAL){
reasonStr = "manual";
}
else if(itr->reason == TagThrottledReason::BUSY_WRITE) {
reasonStr = "busy write";
}
else if(itr->reason == TagThrottledReason::BUSY_READ) {
reasonStr = "busy read";
}
printf(" %12d | %13ds | %9s | %6s | %10s |%s\n", (int)(itr->tpsRate),
std::min((int)(itr->expirationTime - now()), (int)(itr->initialDuration)),
transactionPriorityToString(itr->priority, false),
itr->throttleType == TagThrottleType::AUTO ? "auto" : "manual",
reasonStr.c_str(),
itr->tag.toString().c_str());
}
}
@ -4155,7 +4209,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("Usage: throttle list [LIMIT]\n");
}
if(!anyLogged) {
printf("There are no throttled tags\n");
printf("There are no %s tags\n", reportThrottled ? "throttled" : "recommended");
}
}
else if(tokencmp(tokens[1], "on")) {

View File

@ -225,9 +225,9 @@ private:
// Do the upload, and if it fails forward errors to m_error and also stop if anything else sends an error to m_error
// Also, hold a releaser for the concurrent upload slot while all that is going on.
f->m_parts.back()->etag = holdWhile(std::shared_ptr<FlowLock::Releaser>(new FlowLock::Releaser(f->m_concurrentUploads, 1)),
joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error)
);
auto releaser = std::make_shared<FlowLock::Releaser>(f->m_concurrentUploads, 1);
f->m_parts.back()->etag =
holdWhile(std::move(releaser), joinErrorGroup(doPartUpload(f, f->m_parts.back().getPtr()), f->m_error));
// Make a new part to write to
if(startNew)

View File

@ -349,7 +349,7 @@ public:
std::vector<std::unique_ptr<SpecialKeyRangeReadImpl>> specialKeySpaceModules;
std::unique_ptr<SpecialKeySpace> specialKeySpace;
void registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl> impl);
std::unique_ptr<SpecialKeyRangeReadImpl> &&impl);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;

View File

@ -700,7 +700,7 @@ Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
}
void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl> impl) {
std::unique_ptr<SpecialKeyRangeReadImpl> &&impl) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
}
@ -3694,7 +3694,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
if (info.debugID.present()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trLogInfo->identifier)
.detail("ServerTraceID", info.debugID.get().first());
.detail("ServerTraceID", info.debugID.get());
}
break;
@ -3730,7 +3730,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
if (trLogInfo && !trLogInfo->identifier.empty()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trLogInfo->identifier)
.detail("ServerTraceID", info.debugID.get().first());
.detail("ServerTraceID", info.debugID.get());
}
break;
@ -4124,9 +4124,9 @@ Future<Void> Transaction::onError( Error const& e ) {
return e;
}
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys);
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys);
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRangeRef keys, Reference<LocationInfo> locationInfo) {
ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRange keys, Reference<LocationInfo> locationInfo) {
loop {
try {
WaitMetricsRequest req(keys, StorageMetrics(), StorageMetrics());
@ -4148,7 +4148,7 @@ ACTOR Future<StorageMetrics> doGetStorageMetrics(Database cx, KeyRangeRef keys,
}
}
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys) {
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys) {
state Span span("NAPI:GetStorageMetricsLargeKeyRange"_loc);
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(
getKeyRangeLocations(cx, keys, std::numeric_limits<int>::max(), false, &StorageServerInterface::waitMetrics,

View File

@ -110,6 +110,31 @@ TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) {
}
namespace ThrottleApi {
ACTOR Future<bool> getValidAutoEnabled(Transaction* tr, Database db) {
state bool result;
loop {
Optional<Value> value = wait(tr->get(tagThrottleAutoEnabledKey));
if(!value.present()) {
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
}
else if(value.get() == LiteralStringRef("1")) {
result = true;
}
else if(value.get() == LiteralStringRef("0")) {
result = false;
}
else {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", db->dbId).detail("Value", value.get());
tr->reset();
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
continue;
}
return result;
};
}
void signalThrottleChange(Transaction &tr) {
tr.atomicOp(tagThrottleSignalKey, LiteralStringRef("XXXXXXXXXX\x00\x00\x00\x00"), MutationRef::SetVersionstampedValue);
}
@ -146,12 +171,16 @@ namespace ThrottleApi {
return Void();
}
ACTOR Future<std::vector<TagThrottleInfo>> getThrottledTags(Database db, int limit) {
ACTOR Future<std::vector<TagThrottleInfo>> getThrottledTags(Database db, int limit, bool containsRecommend) {
state Transaction tr(db);
state bool reportAuto = containsRecommend;
loop {
try {
Standalone<RangeResultRef> throttles = wait(tr.getRange(tagThrottleKeys, limit));
if (!containsRecommend) {
wait(store(reportAuto, getValidAutoEnabled(&tr, db)));
}
Standalone<RangeResultRef> throttles = wait(tr.getRange(
reportAuto ? tagThrottleKeys : KeyRangeRef(tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix), limit));
std::vector<TagThrottleInfo> results;
for(auto throttle : throttles) {
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key), TagThrottleValue::fromValue(throttle.value)));
@ -164,13 +193,41 @@ namespace ThrottleApi {
}
}
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration, TagThrottleType throttleType, TransactionPriority priority, Optional<double> expirationTime) {
ACTOR Future<std::vector<TagThrottleInfo>> getRecommendedTags(Database db, int limit) {
state Transaction tr(db);
loop {
try {
bool enableAuto = wait(getValidAutoEnabled(&tr, db));
if(enableAuto) {
return std::vector<TagThrottleInfo>();
}
Standalone<RangeResultRef> throttles = wait(tr.getRange(KeyRangeRef(tagThrottleAutoKeysPrefix, tagThrottleKeys.end), limit));
std::vector<TagThrottleInfo> results;
for(auto throttle : throttles) {
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key), TagThrottleValue::fromValue(throttle.value)));
}
return results;
}
catch(Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration,
TagThrottleType throttleType, TransactionPriority priority, Optional<double> expirationTime,
Optional<TagThrottledReason> reason) {
state Transaction tr(db);
state Key key = TagThrottleKey(tags, throttleType, priority).toKey();
ASSERT(initialDuration > 0);
TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration);
if(throttleType == TagThrottleType::MANUAL) {
reason = TagThrottledReason::MANUAL;
}
TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration,
reason.present() ? reason.get() : TagThrottledReason::UNSET);
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue()));
wr << throttle;
state Value value = wr.toValue();

View File

@ -115,6 +115,13 @@ enum class TagThrottleType : uint8_t {
AUTO
};
enum class TagThrottledReason: uint8_t {
UNSET = 0,
MANUAL,
BUSY_READ,
BUSY_WRITE
};
struct TagThrottleKey {
TagSet tags;
TagThrottleType throttleType;
@ -132,17 +139,26 @@ struct TagThrottleValue {
double tpsRate;
double expirationTime;
double initialDuration;
TagThrottledReason reason;
TagThrottleValue() : tpsRate(0), expirationTime(0), initialDuration(0) {}
TagThrottleValue(double tpsRate, double expirationTime, double initialDuration)
: tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
TagThrottleValue() : tpsRate(0), expirationTime(0), initialDuration(0), reason(TagThrottledReason::UNSET) {}
TagThrottleValue(double tpsRate, double expirationTime, double initialDuration, TagThrottledReason reason)
: tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration), reason(reason) {}
static TagThrottleValue fromValue(const ValueRef& value);
//To change this serialization, ProtocolVersion::TagThrottleValue must be updated, and downgrades need to be considered
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, tpsRate, expirationTime, initialDuration);
if(ar.protocolVersion().hasTagThrottleValueReason()) {
serializer(ar, tpsRate, expirationTime, initialDuration, reinterpret_cast<uint8_t&>(reason));
}
else if(ar.protocolVersion().hasTagThrottleValue()) {
serializer(ar, tpsRate, expirationTime, initialDuration);
if(ar.isDeserializing) {
reason = TagThrottledReason::UNSET;
}
}
}
};
@ -153,12 +169,13 @@ struct TagThrottleInfo {
double tpsRate;
double expirationTime;
double initialDuration;
TagThrottledReason reason;
TagThrottleInfo(TransactionTag tag, TagThrottleType throttleType, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration)
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
TagThrottleInfo(TransactionTag tag, TagThrottleType throttleType, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration, TagThrottledReason reason = TagThrottledReason::UNSET)
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration), reason(reason) {}
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration)
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration), reason(value.reason)
{
ASSERT(key.tags.size() == 1); // Multiple tags per throttle is not currently supported
tag = *key.tags.begin();
@ -166,10 +183,12 @@ struct TagThrottleInfo {
};
namespace ThrottleApi {
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db, int const& limit);
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db, int const& limit, bool const& containsRecommend = false);
Future<std::vector<TagThrottleInfo>> getRecommendedTags(Database const& db, int const& limit);
Future<Void> throttleTags(Database const& db, TagSet const& tags, double const& tpsRate, double const& initialDuration,
TagThrottleType const& throttleType, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>());
TagThrottleType const& throttleType, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>(),
Optional<TagThrottledReason> const& reason = Optional<TagThrottledReason>());
Future<bool> unthrottleTags(Database const& db, TagSet const& tags, Optional<TagThrottleType> const& throttleType, Optional<TransactionPriority> const& priority);

View File

@ -53,29 +53,17 @@ bool IReplicationPolicy::validateFull(
if (!solved) {
if (validate(totalSolution, fromServers)) {
if (g_replicationdebug > 2) {
printf("Error: Validate unsolved policy with%3lu also servers and%3lu solution servers\n", alsoServers.size(), solutionSet.size());
}
valid = false;
}
else if (validate(fromServers->getGroupEntries(), fromServers)) {
if (g_replicationdebug > 2) {
printf("Error: Validated unsolved policy with all%5d servers\n", fromServers->size());
}
valid = false;
}
}
else if (!validate(totalSolution, fromServers)) {
if (g_replicationdebug > 2) {
printf("Error: Failed to validate solved policy with%3lu also servers and%3lu solution servers\n", alsoServers.size(), solutionSet.size());
}
valid = false;
}
else if (solutionSet.empty()) {
if (!validate(alsoServers, fromServers)) {
if (g_replicationdebug > 2) {
printf("Error: Failed to validate policy with only%3lu also servers\n", alsoServers.size());
}
valid = false;
}
}
@ -85,14 +73,7 @@ bool IReplicationPolicy::validateFull(
totalSolution[lastSolutionIndex] = totalSolution.back();
totalSolution.pop_back();
for (int index = 0; index < solutionSet.size() && index < totalSolution.size(); index ++) {
if (g_replicationdebug > 3) {
auto fromServer = fromServers->getRecordViaEntry(missingEntry);
printf("Test remove entry: %s test:%3d of%3lu\n", fromServers->getEntryInfo(missingEntry).c_str(), index+1, solutionSet.size());
}
if (validate(totalSolution, fromServers)) {
if (g_replicationdebug > 2) {
printf("Invalid extra entry: %s\n", fromServers->getEntryInfo(missingEntry).c_str());
}
valid = false;
break;
}
@ -119,9 +100,6 @@ bool PolicyOne::selectReplicas(
itemsUsed ++;
totalUsed ++;
}
if (g_replicationdebug > 0) {
printf("PolicyOne used:%5d results:%3d from %3d servers\n", totalUsed, itemsUsed, fromServers->size());
}
return (totalUsed > 0);
}
@ -205,50 +183,16 @@ bool PolicyAcross::validate(
}
}
if (validMap.size() < _count) {
if (g_replicationdebug > 3) {
printf("Across too few values:%3lu <%2d key: %-7s policy: %-10s => %s\n", validMap.size(), _count, _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
}
valid = false;
}
else {
if (g_replicationdebug > 3) {
printf("Across check values:%9lu key: %-7s solutions:%2lu count:%2d policy: %-10s => %s\n", validMap.size(), _attribKey.c_str(), solutionSet.size(), _count, _policy->name().c_str(), _policy->info().c_str());
for (auto& itValue : validMap) {
printf(" value: (%3d) %-10s\n", itValue.first._id, fromServers->valueText(itValue.first).c_str());
}
}
for (auto& itValid : validMap) {
// itValid.second is the vector of LocalityEntries that belong to the same locality
if (_policy->validate(itValid.second, fromServers)) {
if (g_replicationdebug > 4) {
printf("Across valid solution: %6lu key: %-7s count:%3d of%3d value: (%3d) %-10s policy: %-10s => "
"%s\n",
itValid.second.size(), _attribKey.c_str(), count + 1, _count, itValid.first._id,
fromServers->valueText(itValid.first).c_str(), _policy->name().c_str(),
_policy->info().c_str());
if (g_replicationdebug > 5) {
for (auto& entry : itValid.second) {
printf(" entry: %s\n", fromServers->getEntryInfo(entry).c_str());
}
}
}
count ++;
} else if (g_replicationdebug > 4) {
printf("Across invalid solution:%5lu key: %-7s value: (%3d) %-10s policy: %-10s => %s\n", itValid.second.size(), _attribKey.c_str(), itValid.first._id, fromServers->valueText(itValid.first).c_str(), _policy->name().c_str(), _policy->info().c_str());
if (g_replicationdebug > 5) {
for (auto& entry : itValid.second) {
printf(" entry: %s\n", fromServers->getEntryInfo(entry).c_str());
}
}
}
}
if (count < _count) {
if (g_replicationdebug > 3) {
printf("Across failed solution: %3lu key: %-7s values:%3lu count: %d=%d policy: %-10s => %s\n", solutionSet.size(), _attribKey.c_str(), validMap.size(), count, _count, _policy->name().c_str(), _policy->info().c_str());
for (auto& entry : solutionSet) {
printf(" entry: %s\n", fromServers->getEntryInfo(entry).c_str());
}
}
valid = false;
}
}
@ -277,9 +221,6 @@ bool PolicyAcross::selectReplicas(
_newResults.clear();
_addedResults.resize(_arena, 0);
if (g_replicationdebug > 0) {
printf("Across !also:%4lu key: %-7s policy: %-10s => %s\n", alsoServers.size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
}
for (auto& alsoServer : alsoServers) {
auto value = fromServers->getValueViaGroupKey(alsoServer, groupIndexKey);
if (value.present()) {
@ -287,16 +228,6 @@ bool PolicyAcross::selectReplicas(
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
//_selected is a set of processes that have the same indexKey and indexValue (value)
_selected = fromServers->restrict(indexKey, value.get());
if (g_replicationdebug > 0) {
if (_selected->size() > 0) {
// entry is the locality entry info (entryValue) from the to-be-selected team member alsoServer
printf("Across !select key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(),
value.get()._id, fromServers->valueText(value.get()).c_str(),
fromServers->getEntryInfo(alsoServer).c_str());
} else {
printf("Across !select empty\n");
}
}
if (_selected->size()) {
// Pass only the also array item which are valid for the value
resultsSize = _newResults.size();
@ -321,11 +252,6 @@ bool PolicyAcross::selectReplicas(
if ((count < _count) && (_addedResults.size())) {
// Sort the added results array
std::sort(_addedResults.begin(), _addedResults.end(), PolicyAcross::compareAddedResults);
if (g_replicationdebug > 0) {
printf("Across !add sets key: %-7s sets:%3d results:%3lu count:%3d of%3d\n", _attribKey.c_str(), _addedResults.size(), _newResults.size(), count, _count);
}
if (g_replicationdebug > 0) {
LocalitySet::staticDisplayEntries(fromServers, alsoServers, "also");
LocalitySet::staticDisplayEntries(fromServers, results, "results");
@ -334,9 +260,6 @@ bool PolicyAcross::selectReplicas(
for (auto& addedResult : _addedResults) {
count ++;
if (g_replicationdebug > 0) {
printf("Across !add set key: %-7s count:%3d of%3d results:%3d index:%3d\n", _attribKey.c_str(), count, _count, addedResult.first, addedResult.second);
}
results.reserve(results.size() + addedResult.first);
results.insert(results.end(), _newResults.begin()+addedResult.second, _newResults.begin()+addedResult.second+addedResult.first);
if (count >= _count) break;
@ -349,9 +272,6 @@ bool PolicyAcross::selectReplicas(
// Cannot find replica from the least used alsoServers, now try to find replicas from all servers
// Process the remaining values
if (count < _count) {
if (g_replicationdebug > 0) {
printf("Across items:%4d key: %-7s policy: %-10s => %s count:%3d of%3d\n", fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str(), count, _count);
}
int recordIndex;
// Use mutable array so that swaps does not affect actual element array
auto& mutableArray = fromServers->getMutableEntries();
@ -367,20 +287,8 @@ bool PolicyAcross::selectReplicas(
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
_selected = fromServers->restrict(indexKey, value.get());
if (_selected->size()) {
if (g_replicationdebug > 5) {
printf("Across select:%3d key: %-7s value: (%3d) %-10s entry: %s index:%4d\n",
fromServers->size() - checksLeft + 1, _attribKey.c_str(), value.get()._id,
fromServers->valueText(value.get()).c_str(),
fromServers->getEntryInfo(entry).c_str(), recordIndex);
}
if (_policy->selectReplicas(_selected, emptyEntryArray, results))
{
if (g_replicationdebug > 5) {
printf("Across added:%4d key: %-7s value: (%3d) %-10s policy: %-10s => %s needed:%3d\n",
count + 1, _attribKey.c_str(), value.get()._id,
fromServers->valueText(value.get()).c_str(), _policy->name().c_str(),
_policy->info().c_str(), _count);
}
count ++;
if (count >= _count) break;
_usedValues.insert(lowerBound, value.get());
@ -395,13 +303,9 @@ bool PolicyAcross::selectReplicas(
}
// Clear the return array, if not satified
if (count < _count) {
if (g_replicationdebug > 0) printf("Across result count: %d < %d requested\n", count, _count);
results.resize(resultsInit);
count = 0;
}
if (g_replicationdebug > 0) {
printf("Across used:%5lu results:%3d from %3d items key: %-7s policy: %-10s => %s\n", results.size()-resultsInit, count, fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
}
return (count >= _count);
}

View File

@ -2075,7 +2075,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (auto& server : serverTeam) {
score += server_info[server]->teams.size();
}
TraceEvent("BuildServerTeams")
TraceEvent(SevDebug, "BuildServerTeams")
.detail("Score", score)
.detail("BestScore", bestScore)
.detail("TeamSize", serverTeam.size())
@ -4826,6 +4826,21 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
return Void();
}
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream<GetMetricsListRequest> getShardMetricsList) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if(result.isError()) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<Void> dbInfoChange = db->onChange();
if (!setDDEnabled(false, snapReq.snapUID)) {
@ -5000,16 +5015,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID);
break;
}
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if ( result.isError() ) {
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
req.reply.send(rep);
}
when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) {
actors.add(ddGetMetrics(req, getShardMetricsList));
}
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db));

View File

@ -105,6 +105,10 @@ public:
// Free pageID to be used again after the commit that moves oldestVersion past v
virtual void freePage(LogicalPageID pageID, Version v) = 0;
// If id is remapped, delete the original as of version v and return the page it was remapped to. The caller
// is then responsible for referencing and deleting the returned page ID.
virtual LogicalPageID detachRemappedPage(LogicalPageID id, Version v) = 0;
// Returns the latest data (regardless of version) for a page by LogicalPageID
// The data returned will be the later of
// - the most recent committed atomic
@ -133,7 +137,7 @@ public:
virtual StorageBytes getStorageBytes() const = 0;
// Count of pages in use by the pager client
// Count of pages in use by the pager client (including retained old page versions)
virtual Future<int64_t> getUserPageCount() = 0;
// Future returned is ready when pager has been initialized from disk and is ready for reads and writes.

View File

@ -2,6 +2,7 @@
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include "flow/flow.h"
#include "flow/IThreadPool.h"
@ -22,14 +23,23 @@ StringRef toStringRef(rocksdb::Slice s) {
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
}
rocksdb::Options getOptions() {
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::ColumnFamilyOptions options;
options.level_compaction_dynamic_level_bytes = true;
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
// Compact sstables when there's too much deleted stuff.
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
return options;
}
rocksdb::ColumnFamilyOptions getCFOptions() {
return {};
rocksdb::Options getOptions() {
rocksdb::Options options({}, getCFOptions());
options.avoid_unnecessary_blocking_io = true;
options.create_if_missing = true;
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
}
return options;
}
struct RocksDBKeyValueStore : IKeyValueStore {
@ -119,7 +129,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
struct Reader : IThreadPoolReceiver {
DB& db;
rocksdb::ReadOptions readOptions;
explicit Reader(DB& db) : db(db) {}
@ -141,7 +150,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
rocksdb::PinnableSlice value;
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
traceBatch.get().dump();
@ -172,7 +181,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
@ -195,33 +204,51 @@ struct RocksDBKeyValueStore : IKeyValueStore {
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
Standalone<RangeResultRef> result;
if (a.rowLimit == 0 || a.byteLimit == 0) {
a.result.send(result);
}
int accumulatedBytes = 0;
rocksdb::Status s;
if (a.rowLimit >= 0) {
rocksdb::ReadOptions options;
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Next();
}
s = cursor->status();
} else {
rocksdb::ReadOptions options;
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
}
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Prev();
}
s = cursor->status();
}
auto s = cursor->status();
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
}

View File

@ -93,7 +93,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0;
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 0 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
@ -236,7 +236,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true;
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
init( DD_SS_STUCK_TIME_LIMIT, 300.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 200.0 + deterministicRandom()->random01() * 100.0; }
@ -308,6 +308,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// KeyValueStoreMemory
init( REPLACE_CONTENTS_BYTES, 1e5 );
// KeyValueStoreRocksDB
init( ROCKSDB_BACKGROUND_PARALLELISM, 0 );
init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
init( MAX_NOTIFICATIONS, 100000 );
@ -574,7 +578,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 );
init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 );
init( MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 10.0 );
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
init( DBINFO_FAILED_DELAY, 1.0 );
// Test harness
@ -651,7 +655,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 );
init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 );
init( REDWOOD_LOGGING_INTERVAL, 5.0 );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );

View File

@ -243,6 +243,10 @@ public:
// KeyValueStoreMemory
int64_t REPLACE_CONTENTS_BYTES;
// KeyValueStoreRocksDB
int ROCKSDB_BACKGROUND_PARALLELISM;
int64_t ROCKSDB_MEMTABLE_BYTES;
// Leader election
int MAX_NOTIFICATIONS;
int MIN_NOTIFICATIONS;

View File

@ -723,7 +723,7 @@ void determineCommittedTransactions(CommitBatchContext* self) {
self->lockedKey = pProxyCommitData->txnStateStore->readValue(databaseLockedKey).get();
self->locked = self->lockedKey.present() && self->lockedKey.get().size();
const auto& mustContainSystemKey = pProxyCommitData->txnStateStore->readValue(mustContainSystemMutationsKey).get();
const Optional<Value> mustContainSystemKey = pProxyCommitData->txnStateStore->readValue(mustContainSystemMutationsKey).get();
if (mustContainSystemKey.present() && mustContainSystemKey.get().size()) {
for (int t = 0; t < trs.size(); t++) {
if( self->committed[t] == ConflictBatch::TransactionCommitted ) {

View File

@ -338,7 +338,7 @@ public:
return Optional<ClientTagThrottleLimits>();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(bool autoThrottlingEnabled) {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for(auto tagItr = tagData.begin(); tagItr != tagData.end();) {
@ -401,14 +401,18 @@ public:
}
tagPresent = true;
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if(!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
if (autoThrottlingEnabled) {
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(
tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if (!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second =
ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
} else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] =
ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
else {
ASSERT(autoItr->second.limits.expiration <= now());
@ -481,6 +485,7 @@ public:
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
uint32_t busyReadTagCount = 0, busyWriteTagCount = 0;
};
struct RatekeeperLimits {
@ -786,6 +791,8 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id).detail("Value", autoThrottlingEnabled.get().get());
}
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
if(!committed)
tr.set(tagThrottleAutoEnabledKey, LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0"));
}
RkTagThrottleCollection updatedTagThrottles;
@ -813,6 +820,12 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
if(tagKey.throttleType == TagThrottleType::AUTO) {
updatedTagThrottles.autoThrottleTag(self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
if(tagValue.reason == TagThrottledReason::BUSY_READ){
updatedTagThrottles.busyReadTagCount ++;
}
else if(tagValue.reason == TagThrottledReason::BUSY_WRITE) {
updatedTagThrottles.busyWriteTagCount ++;
}
}
else {
updatedTagThrottles.manualThrottleTag(self->id, tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits);
@ -848,7 +861,10 @@ void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
TagSet tags;
tags.addTag(ss.busiestTag.get());
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
self->addActor.send(ThrottleApi::throttleTags(
self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO,
TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
TagThrottledReason::BUSY_READ));
}
}
}
@ -1195,7 +1211,10 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount())
.detail("TagsAutoThrottledBusyRead", self->throttledTags.busyReadTagCount)
.detail("TagsAutoThrottledBusyWrite", self->throttledTags.busyWriteTagCount)
.detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount())
.detail("AutoThrottlingEnabled", self->autoThrottlingEnabled)
.trackLatest(name);
}
}
@ -1305,7 +1324,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
p.lastThrottledTagChangeId = self.throttledTagChangeId;
p.lastTagPushTime = now();
reply.throttledTags = self.throttledTags.getClientRates();
reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled);
TEST(reply.throttledTags.present() && reply.throttledTags.get().size() > 0); // Returning tag throttles to a proxy
}

View File

@ -185,7 +185,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
state int retries = 0;
state double numOps = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID)
TraceEvent(delayTime > 5 ? SevWarnAlways : SevDebug, "FastRestoreApplierClearRangeMutationsStart", applierID)
.detail("BatchIndex", batchIndex)
.detail("Ranges", ranges.size())
.detail("DelayTime", delayTime);
@ -296,7 +296,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].get().present()) { // Key not exist in DB
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
.suppressFor(5.0)
.detail("BatchIndex", batchIndex)
.detail("Key", key.first)
@ -304,7 +304,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
.detail("PendingMutations", key.second->second.pendingMutations.size())
.detail("StagingKeyType", getTypeString(key.second->second.type));
for (auto& vm : key.second->second.pendingMutations) {
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("PendingMutationVersion", vm.first.toString())
.detail("PendingMutation", vm.second.toString());
}

View File

@ -300,7 +300,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
state std::vector<RestoreFileFR> logFiles;
state std::vector<RestoreFileFR> allFiles;
state Version minRangeVersion = MAX_VERSION;
state ActorCollection actors(false);
state Future<Void> error = actorCollection(self->addActor.getFuture());
self->initBackupContainer(request.url);
@ -356,7 +356,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
}
}
actors.add(monitorFinishedVersion(self, request));
self->addActor.send(monitorFinishedVersion(self, request));
state std::vector<VersionBatch>::iterator versionBatch = versionBatches.begin();
for (; versionBatch != versionBatches.end(); versionBatch++) {
while (self->runningVersionBatches.get() >= SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM && !releaseVBOutOfOrder) {
@ -378,7 +378,11 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
wait(delay(SERVER_KNOBS->FASTRESTORE_VB_LAUNCH_DELAY));
}
wait(waitForAll(fBatches));
try {
wait(waitForAll(fBatches) || error);
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreControllerDispatchVersionBatchesUnexpectedError").error(e);
}
TraceEvent("FastRestoreController").detail("RestoreToVersion", request.targetVersion);
return request.targetVersion;

View File

@ -149,6 +149,10 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
std::map<UID, double> rolesHeartBeatTime; // Key: role id; Value: most recent time controller receives heart beat
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
// addActor is used to create the actorCollection when the RestoreController is created
PromiseStream<Future<Void>> addActor;
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }

View File

@ -1786,11 +1786,14 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
state TraceEventFields ratekeeper = wait( timeoutError(rkWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(rkWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
bool autoThrottlingEnabled = ratekeeper.getInt("AutoThrottlingEnabled");
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
int autoThrottledTags = ratekeeper.getInt("TagsAutoThrottled");
int autoThrottledTagsBusyRead = ratekeeper.getInt("TagsAutoThrottledBusyRead");
int autoThrottledTagsBusyWrite = ratekeeper.getInt("TagsAutoThrottledBusyWrite");
int manualThrottledTags = ratekeeper.getInt("TagsManuallyThrottled");
int ssCount = ratekeeper.getInt("StorageServers");
int tlogCount = ratekeeper.getInt("TLogs");
@ -1820,9 +1823,28 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
JsonBuilderObject throttledTagsObj;
JsonBuilderObject autoThrottledTagsObj;
autoThrottledTagsObj["count"] = autoThrottledTags;
JsonBuilderObject autoThrottledTagsObj, recommendThrottleTagsObj;
if(autoThrottlingEnabled) {
autoThrottledTagsObj["count"] = autoThrottledTags;
autoThrottledTagsObj["busy_read"] = autoThrottledTagsBusyRead;
autoThrottledTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
recommendThrottleTagsObj["count"] = 0;
recommendThrottleTagsObj["busy_read"] = 0;
recommendThrottleTagsObj["busy_write"] = 0;
}
else {
recommendThrottleTagsObj["count"] = autoThrottledTags;
recommendThrottleTagsObj["busy_read"] = autoThrottledTagsBusyRead;
recommendThrottleTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
autoThrottledTagsObj["count"] = 0;
autoThrottledTagsObj["busy_read"] = 0;
autoThrottledTagsObj["busy_write"] = 0;
}
throttledTagsObj["auto"] = autoThrottledTagsObj;
throttledTagsObj["recommend"] = recommendThrottleTagsObj;
JsonBuilderObject manualThrottledTagsObj;
manualThrottledTagsObj["count"] = manualThrottledTags;

View File

@ -92,7 +92,18 @@ std::string toString(LogicalPageID id) {
if (id == invalidLogicalPageID) {
return "LogicalPageID{invalid}";
}
return format("LogicalPageID{%" PRId64 "}", id);
return format("LogicalPageID{%u}", id);
}
std::string toString(Version v) {
if (v == invalidVersion) {
return "invalidVersion";
}
return format("@%" PRId64, v);
}
std::string toString(bool b) {
return b ? "true" : "false";
}
template <typename T>
@ -136,6 +147,11 @@ std::string toString(const Optional<T>& o) {
return "<not present>";
}
template <typename F, typename S>
std::string toString(const std::pair<F, S>& o) {
return format("{%s, %s}", toString(o.first).c_str(), toString(o.second).c_str());
}
// A FIFO queue of T stored as a linked list of pages.
// Main operations are pop(), pushBack(), pushFront(), and flush().
//
@ -765,6 +781,8 @@ struct RedwoodMetrics {
unsigned int lazyClearRequeueExt;
unsigned int lazyClearFree;
unsigned int lazyClearFreeExt;
unsigned int forceUpdate;
unsigned int detachChild;
double buildStoredPct;
double buildFillPct;
unsigned int buildItemCount;
@ -797,6 +815,12 @@ struct RedwoodMetrics {
unsigned int btreeLeafPreload;
unsigned int btreeLeafPreloadExt;
// Return number of pages read or written, from cache or disk
unsigned int pageOps() const {
// All page reads are either a cache hit, probe hit, or a disk read
return pagerDiskWrite + pagerDiskRead + pagerCacheHit + pagerProbeHit;
}
double startTime;
Level& level(unsigned int level) {
@ -807,9 +831,9 @@ struct RedwoodMetrics {
return levels[level - 1];
}
// This will populate a trace event and/or a string with Redwood metrics. The string is a
// reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr) {
// This will populate a trace event and/or a string with Redwood metrics.
// The string is a reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false) {
std::pair<const char*, unsigned int> metrics[] = { { "BTreePreload", btreeLeafPreload },
{ "BTreePreloadExt", btreeLeafPreloadExt },
{ "", 0 },
@ -837,21 +861,26 @@ struct RedwoodMetrics {
{ "PagerRemapCopy", pagerRemapCopy },
{ "PagerRemapSkip", pagerRemapSkip } };
double elapsed = now() - startTime;
for (auto& m : metrics) {
if (*m.first == '\0') {
if (s != nullptr) {
*s += "\n";
}
} else {
if (s != nullptr) {
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
}
if (e != nullptr) {
if (e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if(c != 0 && (!skipZeroes || m.second != 0) ) {
e->detail(m.first, m.second);
}
}
}
if(s != nullptr) {
for (auto& m : metrics) {
if (*m.first == '\0') {
*s += "\n";
} else if(!skipZeroes || m.second != 0) {
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
}
}
}
for (int i = 0; i < btreeLevels; ++i) {
auto& level = levels[i];
std::pair<const char*, unsigned int> metrics[] = {
@ -869,37 +898,44 @@ struct RedwoodMetrics {
{ "LazyClear", level.lazyClearFree },
{ "LazyClearExt", level.lazyClearFreeExt },
{ "", 0 },
{ "ForceUpdate", level.forceUpdate },
{ "DetachChild", level.detachChild },
{ "", 0 },
{ "-BldAvgCount", level.pageBuild ? level.buildItemCount / level.pageBuild : 0 },
{ "-BldAvgFillPct", level.pageBuild ? level.buildFillPct / level.pageBuild * 100 : 0 },
{ "-BldAvgStoredPct", level.pageBuild ? level.buildStoredPct / level.pageBuild * 100 : 0 },
{ "", 0 },
{ "-ModAvgCount", level.pageModify ? level.modifyItemCount / level.pageModify : 0 },
{ "-ModAvgFillPct", level.pageModify ? level.modifyFillPct / level.pageModify * 100 : 0 },
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 }
{ "-ModAvgStoredPct", level.pageModify ? level.modifyStoredPct / level.pageModify * 100 : 0 },
{ "", 0 },
};
if(e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if(c != 0 && (!skipZeroes || m.second != 0) ) {
e->detail(format("L%d%s", i + 1, m.first + (c == '-' ? 1 : 0)), m.second);
}
}
}
if (s != nullptr) {
*s += format("\nLevel %d\n\t", i + 1);
}
for (auto& m : metrics) {
const char* name = m.first;
bool rate = elapsed != 0;
if (*name == '-') {
++name;
rate = false;
}
if (*name == '\0') {
if (s != nullptr) {
for (auto& m : metrics) {
const char* name = m.first;
bool rate = elapsed != 0;
if (*name == '-') {
++name;
rate = false;
}
if (*name == '\0') {
*s += "\n\t";
}
} else {
if (s != nullptr) {
} else if(!skipZeroes || m.second != 0) {
*s += format("%-15s %8u %8u/s ", name, m.second, rate ? int(m.second / elapsed) : 0);
}
if (e != nullptr) {
e->detail(format("L%d%s", i + 1, name), m.second);
}
}
}
}
@ -1124,22 +1160,32 @@ public:
};
struct RemappedPage {
RemappedPage() : version(invalidVersion) {}
RemappedPage(Version v, LogicalPageID o, LogicalPageID n) : version(v), originalPageID(o), newPageID(n) {}
enum Type { NONE = 'N', REMAP = 'R', FREE = 'F', DETACH = 'D' };
RemappedPage(Version v = invalidVersion, LogicalPageID o = invalidLogicalPageID, LogicalPageID n = invalidLogicalPageID) : version(v), originalPageID(o), newPageID(n) {}
Version version;
LogicalPageID originalPageID;
LogicalPageID newPageID;
bool isFree() const {
return newPageID == invalidLogicalPageID;
static Type getTypeOf(LogicalPageID newPageID) {
if(newPageID == invalidLogicalPageID) {
return FREE;
}
if(newPageID == 0) {
return DETACH;
}
return REMAP;
}
Type getType() const {
return getTypeOf(newPageID);
}
bool operator<(const RemappedPage& rhs) { return version < rhs.version; }
std::string toString() const {
return format("RemappedPage(%s -> %s @%" PRId64 "}", ::toString(originalPageID).c_str(),
::toString(newPageID).c_str(), version);
return format("RemappedPage(%c: %s -> %s %s}", getType(), ::toString(originalPageID).c_str(),
::toString(newPageID).c_str(), ::toString(version).c_str());
}
};
@ -1484,6 +1530,35 @@ public:
}
}
LogicalPageID detachRemappedPage(LogicalPageID pageID, Version v) override {
auto i = remappedPages.find(pageID);
if(i == remappedPages.end()) {
// Page is not remapped
return invalidLogicalPageID;
}
// Get the page that id was most recently remapped to
auto iLast = i->second.rbegin();
LogicalPageID newID = iLast->second;
ASSERT(RemappedPage::getTypeOf(newID) == RemappedPage::REMAP);
// If the last change remap was also at v then change the remap to a delete, as it's essentially
// the same as the original page being deleted at that version and newID being used from then on.
if(iLast->first == v) {
debug_printf("DWALPager(%s) op=detachDelete originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
iLast->second = invalidLogicalPageID;
remapQueue.pushBack(RemappedPage{ v, pageID, invalidLogicalPageID });
} else {
debug_printf("DWALPager(%s) op=detach originalID=%s newID=%s @%" PRId64 " oldestVersion=%" PRId64 "\n", filename.c_str(),
toString(pageID).c_str(), toString(newID).c_str(), v, pLastCommittedHeader->oldestVersion);
// Mark id as converted to its last remapped location as of v
i->second[v] = 0;
remapQueue.pushBack(RemappedPage{ v, pageID, 0 });
}
return newID;
}
void freePage(LogicalPageID pageID, Version v) override {
// If pageID has been remapped, then it can't be freed until all existing remaps for that page have been undone,
// so queue it for later deletion
@ -1588,13 +1663,13 @@ public:
auto j = i->second.upper_bound(v);
if (j != i->second.begin()) {
--j;
debug_printf("DWALPager(%s) read %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
debug_printf("DWALPager(%s) op=readAtVersionRemapped %s @%" PRId64 " -> %s\n", filename.c_str(), toString(pageID).c_str(),
v, toString(j->second).c_str());
pageID = j->second;
ASSERT(pageID != invalidLogicalPageID);
}
} else {
debug_printf("DWALPager(%s) read %s @%" PRId64 " (not remapped)\n", filename.c_str(),
debug_printf("DWALPager(%s) op=readAtVersionNotRemapped %s @%" PRId64 " (not remapped)\n", filename.c_str(),
toString(pageID).c_str(), v);
}
@ -1623,29 +1698,126 @@ public:
return std::min(pLastCommittedHeader->oldestVersion, snapshots.front().version);
}
ACTOR static Future<Void> remapCopyAndFree(DWALPager* self, RemappedPage p, VersionToPageMapT *m, VersionToPageMapT::iterator i) {
debug_printf("DWALPager(%s) remapCleanup copyAndFree %s\n", self->filename.c_str(), p.toString().c_str());
ACTOR static Future<Void> removeRemapEntry(DWALPager* self, RemappedPage p, Version oldestRetainedVersion) {
// Get iterator to the versioned page map entry for the original page
state PageToVersionedMapT::iterator iPageMapPair = self->remappedPages.find(p.originalPageID);
// The iterator must be valid and not empty and its first page map entry must match p's version
ASSERT(iPageMapPair != self->remappedPages.end());
ASSERT(!iPageMapPair->second.empty());
state VersionToPageMapT::iterator iVersionPagePair = iPageMapPair->second.find(p.version);
ASSERT(iVersionPagePair != iPageMapPair->second.end());
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.newPageID, false));
RemappedPage::Type firstType = p.getType();
state RemappedPage::Type secondType;
bool secondAfterOldestRetainedVersion = false;
state bool deleteAtSameVersion = false;
if(p.newPageID == iVersionPagePair->second) {
auto nextEntry = iVersionPagePair;
++nextEntry;
if(nextEntry == iPageMapPair->second.end()) {
secondType = RemappedPage::NONE;
} else {
secondType = RemappedPage::getTypeOf(nextEntry->second);
secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion;
}
} else {
ASSERT(iVersionPagePair->second == invalidLogicalPageID);
secondType = RemappedPage::FREE;
deleteAtSameVersion = true;
}
ASSERT(firstType == RemappedPage::REMAP || secondType == RemappedPage::NONE);
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
++g_redwoodMetrics.pagerRemapCopy;
// Scenarios and actions to take:
//
// The first letter (firstType) is the type of the entry just popped from the remap queue.
// The second letter (secondType) is the type of the next item in the queue for the same
// original page ID, if present. If not present, secondType will be NONE.
//
// Since the next item can be arbitrarily ahead in the queue, secondType is determined by
// looking at the remappedPages structure.
//
// R == Remap F == Free D == Detach | == oldestRetaineedVersion
//
// R R | free new ID
// R F | free new ID if R and D are at different versions
// R D | do nothing
// R | R copy new to original ID, free new ID
// R | F copy new to original ID, free new ID
// R | D copy new to original ID
// R | copy new to original ID, free new ID
// F | free original ID
// D | free original ID
//
// Note that
//
// Special case: Page is detached while it is being read in remapCopyAndFree()
// Initial state: R |
// Start remapCopyAndFree(), intending to copy new, ID to originalID and free newID
// New state: R | D
// Read of newID completes.
// Copy new contents over original, do NOT free new ID
// Later popped state: D |
// free original ID
//
state bool freeNewID = (firstType == RemappedPage::REMAP && secondType != RemappedPage::DETACH && !deleteAtSameVersion);
state bool copyNewToOriginal = (firstType == RemappedPage::REMAP && (secondAfterOldestRetainedVersion || secondType == RemappedPage::NONE));
state bool freeOriginalID = (firstType == RemappedPage::FREE || firstType == RemappedPage::DETACH);
// Now that the page data has been copied to the original page, the versioned page map entry is no longer
// needed and the new page ID can be freed as of the next commit.
m->erase(i);
self->freeUnmappedPage(p.newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
debug_printf("DWALPager(%s) remapCleanup %s secondType=%c mapEntry=%s oldestRetainedVersion=%" PRId64 " \n",
self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion);
if(copyNewToOriginal) {
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
Reference<IPage> data = wait(self->readPage(p.newPageID, false, true));
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
++g_redwoodMetrics.pagerRemapCopy;
} else if (firstType == RemappedPage::REMAP) {
++g_redwoodMetrics.pagerRemapSkip;
}
// Now that the page contents have been copied to the original page, if the corresponding map entry
// represented the remap and there wasn't a delete later in the queue at p for the same version then
// erase the entry.
if(!deleteAtSameVersion) {
debug_printf("DWALPager(%s) remapCleanup deleting map entry %s\n", self->filename.c_str(), p.toString().c_str());
// Erase the entry and set iVersionPagePair to the next entry or end
iVersionPagePair = iPageMapPair->second.erase(iVersionPagePair);
// If the map is now empty, delete it
if(iPageMapPair->second.empty()) {
debug_printf("DWALPager(%s) remapCleanup deleting empty map %s\n", self->filename.c_str(), p.toString().c_str());
self->remappedPages.erase(iPageMapPair);
} else if(freeNewID && secondType == RemappedPage::NONE && iVersionPagePair != iPageMapPair->second.end() && RemappedPage::getTypeOf(iVersionPagePair->second) == RemappedPage::DETACH) {
// If we intend to free the new ID and there was no map entry, one could have been added during the wait above.
// If so, and if it was a detach operation, then we can't free the new page ID as its lifetime will be managed
// by the client starting at some later version.
freeNewID = false;
}
}
if(freeNewID) {
debug_printf("DWALPager(%s) remapCleanup freeNew %s\n", self->filename.c_str(), p.toString().c_str());
self->freeUnmappedPage(p.newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
}
if(freeOriginalID) {
debug_printf("DWALPager(%s) remapCleanup freeOriginal %s\n", self->filename.c_str(), p.toString().c_str());
self->freeUnmappedPage(p.originalPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
}
return Void();
}
ACTOR static Future<Void> remapCleanup(DWALPager* self) {
state ActorCollection copies(true);
state ActorCollection tasks(true);
state Promise<Void> signal;
copies.add(signal.getFuture());
tasks.add(signal.getFuture());
self->remapCleanupStop = false;
@ -1654,8 +1826,7 @@ public:
state Version oldestRetainedVersion = self->effectiveOldestVersion();
// Cutoff is the version we can pop to
state RemappedPage cutoff;
cutoff.version = oldestRetainedVersion - self->remapCleanupWindow;
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
// Minimum version we must pop to before obeying stop command.
state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG);
@ -1663,46 +1834,15 @@ public:
loop {
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
debug_printf("DWALPager(%s) remapCleanup popped %s\n", self->filename.c_str(), ::toString(p).c_str());
// Stop if we have reached the cutoff version, which is the start of the cleanup coalescing window
if (!p.present()) {
break;
}
// Get iterator to the versioned page map entry for the original page
auto iPageMapPair = self->remappedPages.find(p.get().originalPageID);
// The iterator must be valid and not empty and its first page map entry must match p's version
ASSERT(iPageMapPair != self->remappedPages.end());
ASSERT(!iPageMapPair->second.empty());
auto iVersionPagePair = iPageMapPair->second.begin();
ASSERT(iVersionPagePair->first == p.get().version);
// If this is a free page entry then free the original page ID
if(p.get().isFree()) {
debug_printf("DWALPager(%s) remapCleanup free %s\n", self->filename.c_str(),
p.get().toString().c_str());
self->freeUnmappedPage(p.get().originalPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
// There can't be any more entries in the page map after this one so verify that
// the map size is 1 and erase the map for p's original page ID.
ASSERT(iPageMapPair->second.size() == 1);
self->remappedPages.erase(iPageMapPair);
}
else {
// If there is no next page map entry or there is but it is after the oldest retained version
// then p must be copied to unmap it.
auto iNextVersionPagePair = iVersionPagePair;
++iNextVersionPagePair;
if(iNextVersionPagePair == iPageMapPair->second.end() || iNextVersionPagePair->first > oldestRetainedVersion) {
// Copy the remapped page to the original so it can be freed.
copies.add(remapCopyAndFree(self, p.get(), &iPageMapPair->second, iVersionPagePair));
}
else {
debug_printf("DWALPager(%s) remapCleanup skipAndFree %s\n", self->filename.c_str(), p.get().toString().c_str());
self->freeUnmappedPage(p.get().newPageID, 0);
++g_redwoodMetrics.pagerRemapFree;
++g_redwoodMetrics.pagerRemapSkip;
iPageMapPair->second.erase(iVersionPagePair);
}
Future<Void> task = removeRemapEntry(self, p.get(), oldestRetainedVersion);
if(!task.isReady()) {
tasks.add(task);
}
// If the stop flag is set and we've reached the minimum stop version according the the allowed lag then stop.
@ -1713,7 +1853,7 @@ public:
debug_printf("DWALPager(%s) remapCleanup stopped (stop=%d)\n", self->filename.c_str(), self->remapCleanupStop);
signal.send(Void());
wait(copies.getResult());
wait(tasks.getResult());
return Void();
}
@ -1889,8 +2029,7 @@ public:
Future<int64_t> getUserPageCount() override {
return map(getUserPageCount_cleanup(this), [=](Void) {
int64_t userPages = pHeader->pageCount - 2 - freeList.numPages - freeList.numEntries -
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages
- remapQueue.numEntries;
delayedFreeList.numPages - delayedFreeList.numEntries - remapQueue.numPages;
debug_printf("DWALPager(%s) userPages=%" PRId64 " totalPageCount=%" PRId64 " freeQueuePages=%" PRId64
" freeQueueCount=%" PRId64 " delayedFreeQueuePages=%" PRId64 " delayedFreeQueueCount=%" PRId64
@ -2871,6 +3010,38 @@ public:
typedef FIFOQueue<LazyClearQueueEntry> LazyClearQueueT;
struct ParentInfo {
ParentInfo() {
count = 0;
bits = 0;
}
void clear() {
count = 0;
bits = 0;
}
static uint32_t mask(LogicalPageID id) {
return 1 << (id & 31);
}
void pageUpdated(LogicalPageID child) {
auto m = mask(child);
if((bits & m) == 0) {
bits |= m;
++count;
}
}
bool maybeUpdated(LogicalPageID child) {
return (mask(child) & bits) != 0;
}
uint32_t bits;
int count;
};
typedef std::unordered_map<LogicalPageID, ParentInfo> ParentInfoMapT;
#pragma pack(push, 1)
struct MetaKey {
static constexpr int FORMAT_VERSION = 8;
@ -2923,8 +3094,8 @@ public:
// durable once the following call to commit() returns
void set(KeyValueRef keyValue) override {
++g_redwoodMetrics.opSet;
++g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
++g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
g_redwoodMetrics.opSetKeyBytes += keyValue.key.size();
g_redwoodMetrics.opSetValueBytes += keyValue.value.size();
m_pBuffer->insert(keyValue.key).mutation().setBoundaryValue(m_pBuffer->copyToArena(keyValue.value));
}
@ -3022,7 +3193,7 @@ public:
// If this page is height 2, then the children are leaves so free them directly
if (btPage.height == 2) {
debug_printf("LazyClear: freeing child %s\n", toString(btChildPageID).c_str());
self->freeBtreePage(btChildPageID, v);
self->freeBTreePage(btChildPageID, v);
freedPages += btChildPageID.size();
metrics.lazyClearFree += 1;
metrics.lazyClearFreeExt += (btChildPageID.size() - 1);
@ -3041,7 +3212,7 @@ public:
// Free the page, now that its children have either been freed or queued
debug_printf("LazyClear: freeing queue entry %s\n", toString(entry.pageID).c_str());
self->freeBtreePage(entry.pageID, v);
self->freeBTreePage(entry.pageID, v);
freedPages += entry.pageID.size();
metrics.lazyClearFree += 1;
metrics.lazyClearFreeExt += entry.pageID.size() - 1;
@ -3146,7 +3317,7 @@ public:
return commit_impl(this);
}
ACTOR static Future<Void> destroyAndCheckSanity_impl(VersionedBTree* self) {
ACTOR static Future<Void> clearAllAndCheckSanity_impl(VersionedBTree* self) {
ASSERT(g_network->isSimulated());
debug_printf("Clearing tree.\n");
@ -3191,7 +3362,7 @@ public:
return Void();
}
Future<Void> destroyAndCheckSanity() { return destroyAndCheckSanity_impl(this); }
Future<Void> clearAllAndCheckSanity() { return clearAllAndCheckSanity_impl(this); }
private:
// Represents a change to a single key - set, clear, or atomic op
@ -3412,6 +3583,8 @@ private:
Future<Void> m_init;
std::string m_name;
int m_blockSize;
std::unordered_map<LogicalPageID, ParentInfo> parents;
ParentInfoMapT childUpdateTracker;
// MetaKey changes size so allocate space for it to expand into
union {
@ -3603,7 +3776,7 @@ private:
// must be rewritten anyway to count for the change in child count or child links.
// Free the old IDs, but only once (before the first output record is added).
if (records.empty()) {
self->freeBtreePage(previousID, v);
self->freeBTreePage(previousID, v);
}
for (p = 0; p < pages.size(); ++p) {
LogicalPageID id = wait(self->m_pager->newPageID());
@ -3771,7 +3944,7 @@ private:
}
}
void freeBtreePage(BTreePageIDRef btPageID, Version v) {
void freeBTreePage(BTreePageIDRef btPageID, Version v) {
// Free individual pages at v
for (LogicalPageID id : btPageID) {
m_pager->freePage(id, v);
@ -3780,7 +3953,7 @@ private:
// Write new version of pageID at version v using page as its data.
// Attempts to reuse original id(s) in btPageID, returns BTreePageID.
ACTOR static Future<BTreePageIDRef> updateBtreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self, BTreePageIDRef oldID, Arena* arena,
Reference<IPage> page, Version writeVersion) {
state BTreePageIDRef newID;
newID.resize(*arena, oldID.size());
@ -3878,19 +4051,23 @@ private:
// If the last record in the range has a null link then this will be null.
const RedwoodRecordRef* expectedUpperBound;
bool inPlaceUpdate;
// CommitSubtree will call one of the following three functions based on its exit path
// Subtree was cleared.
void cleared() {
inPlaceUpdate = false;
childrenChanged = true;
expectedUpperBound = nullptr;
}
// Page was updated in-place through edits and written to maybeNewID
void updatedInPlace(BTreePageIDRef maybeNewID, BTreePage* btPage, int capacity) {
inPlaceUpdate = true;
auto& metrics = g_redwoodMetrics.level(btPage->height);
metrics.pageModify += 1;
metrics.pageModify += (maybeNewID.size() - 1);
metrics.pageModifyExt += (maybeNewID.size() - 1);
metrics.modifyFillPct += (double)btPage->size() / capacity;
metrics.modifyStoredPct += (double)btPage->kvBytes / capacity;
metrics.modifyItemCount += btPage->tree().numItems;
@ -3912,6 +4089,7 @@ private:
// writePages() was used to build 1 or more replacement pages.
void rebuilt(Standalone<VectorRef<RedwoodRecordRef>> newRecords) {
inPlaceUpdate = false;
newLinks = newRecords;
childrenChanged = true;
@ -3952,14 +4130,15 @@ private:
struct InternalPageModifier {
InternalPageModifier() {}
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating)
: btPage(p), m(m), updating(updating), changesMade(false) {}
InternalPageModifier(BTreePage* p, BTreePage::BinaryTree::Mirror* m, bool updating, ParentInfo *parentInfo)
: btPage(p), m(m), updating(updating), changesMade(false), parentInfo(parentInfo) {}
bool updating;
BTreePage* btPage;
BTreePage::BinaryTree::Mirror* m;
Standalone<VectorRef<RedwoodRecordRef>> rebuild;
bool changesMade;
ParentInfo *parentInfo;
bool empty() const {
if (updating) {
@ -4055,6 +4234,13 @@ private:
// endpoint.
changesMade = true;
} else {
if(u.inPlaceUpdate) {
for(auto id : u.decodeLowerBound->getChildPage()) {
parentInfo->pageUpdated(id);
}
}
keep(u.cBegin, u.cEnd);
}
@ -4226,7 +4412,7 @@ private:
debug_printf("%s Inserted %s [mutation, boundary start]\n", context.c_str(),
rec.toString().c_str());
} else {
debug_printf("%s Inserted failed for %s [mutation, boundary start]\n", context.c_str(),
debug_printf("%s Insert failed for %s [mutation, boundary start]\n", context.c_str(),
rec.toString().c_str());
switchToLinearMerge();
}
@ -4339,12 +4525,12 @@ private:
// If the tree is now empty, delete the page
if (deltaTree.numItems == 0) {
update->cleared();
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
debug_printf("%s Page updates cleared all entries, returning %s\n", context.c_str(),
toString(*update).c_str());
} else {
// Otherwise update it.
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
page.castTo<IPage>(), writeVersion));
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
@ -4357,7 +4543,7 @@ private:
// If everything in the page was deleted then this page should be deleted as of the new version
if (merged.empty()) {
update->cleared();
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
debug_printf("%s All leaf page contents were cleared, returning %s\n", context.c_str(),
toString(*update).c_str());
@ -4511,7 +4697,7 @@ private:
if (btPage->height == 2) {
debug_printf("%s: freeing child page in cleared subtree range: %s\n",
context.c_str(), ::toString(rec.getChildPage()).c_str());
self->freeBtreePage(rec.getChildPage(), writeVersion);
self->freeBTreePage(rec.getChildPage(), writeVersion);
} else {
debug_printf("%s: queuing subtree deletion cleared subtree range: %s\n",
context.c_str(), ::toString(rec.getChildPage()).c_str());
@ -4547,7 +4733,10 @@ private:
wait(waitForAll(recursions));
debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate);
// Note: parentInfo could be invalid after a wait and must be re-initialized.
// All uses below occur before waits so no reinitialization is done.
state ParentInfo *parentInfo = &self->childUpdateTracker[rootID.front()];
state InternalPageModifier m(btPage, cursor.mirror, tryToUpdate, parentInfo);
// Apply the possible changes for each subtree range recursed to, except the last one.
// For each range, the expected next record, if any, is checked against the first boundary
@ -4565,25 +4754,103 @@ private:
context.c_str(), m.changesMade, update->toString().c_str());
m.applyUpdate(*slices.back(), m.changesMade ? update->subtreeUpperBound : update->decodeUpperBound);
state bool detachChildren = (parentInfo->count > 2);
state bool forceUpdate = false;
if(!m.changesMade && detachChildren) {
debug_printf("%s Internal page forced rewrite because at least %d children have been updated in-place.\n", context.c_str(), parentInfo->count);
forceUpdate = true;
if(!m.updating) {
page = self->cloneForUpdate(page);
cursor = getCursor(page);
btPage = (BTreePage*)page->begin();
m.btPage = btPage;
m.m = cursor.mirror;
m.updating = true;
}
++g_redwoodMetrics.level(btPage->height).forceUpdate;
}
// If page contents have changed
if (m.changesMade) {
if ((m.empty())) {
if (m.changesMade || forceUpdate) {
if (m.empty()) {
update->cleared();
debug_printf("%s All internal page children were deleted so deleting this page too, returning %s\n",
context.c_str(), toString(*update).c_str());
self->freeBtreePage(rootID, writeVersion);
self->freeBTreePage(rootID, writeVersion);
self->childUpdateTracker.erase(rootID.front());
} else {
if (m.updating) {
// Page was updated in place
BTreePageIDRef newID = wait(self->updateBtreePage(self, rootID, &update->newLinks.arena(),
// Page was updated in place (or being forced to be updated in place to update child page ids)
debug_printf("%s Internal page modified in-place tryUpdate=%d forceUpdate=%d detachChildren=%d\n", context.c_str(), tryToUpdate, forceUpdate, detachChildren);
if(detachChildren) {
int detached = 0;
cursor.moveFirst();
auto &stats = g_redwoodMetrics.level(btPage->height);
while(cursor.valid()) {
if(cursor.get().value.present()) {
for(auto &p : cursor.get().getChildPage()) {
if(parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
if(newID != invalidLogicalPageID) {
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
p = newID;
++stats.detachChild;
++detached;
}
}
}
}
cursor.moveNext();
}
parentInfo->clear();
if(forceUpdate && detached == 0) {
debug_printf("%s No children detached during forced update, returning %s\n", context.c_str(), toString(*update).c_str());
return Void();
}
}
BTreePageIDRef newID = wait(self->updateBTreePage(self, rootID, &update->newLinks.arena(),
page.castTo<IPage>(), writeVersion));
debug_printf(
"%s commitSubtree(): Internal page updated in-place at version %s, new contents: %s\n", context.c_str(), toString(writeVersion).c_str(),
btPage->toString(false, newID, snapshot->getVersion(), update->decodeLowerBound, update->decodeUpperBound)
.c_str());
update->updatedInPlace(newID, btPage, newID.size() * self->m_blockSize);
debug_printf("%s Internal page updated in-place, returning %s\n", context.c_str(),
toString(*update).c_str());
} else {
// Page was rebuilt, possibly split.
debug_printf("%s Internal page modified, creating replacements.\n", context.c_str());
debug_printf("%s Internal page could not be modified, rebuilding replacement(s).\n", context.c_str());
if(detachChildren) {
auto &stats = g_redwoodMetrics.level(btPage->height);
for(auto &rec : m.rebuild) {
if(rec.value.present()) {
BTreePageIDRef oldPages = rec.getChildPage();
BTreePageIDRef newPages;
for(int i = 0; i < oldPages.size(); ++i) {
LogicalPageID p = oldPages[i];
if(parentInfo->maybeUpdated(p)) {
LogicalPageID newID = self->m_pager->detachRemappedPage(p, writeVersion);
if(newID != invalidLogicalPageID) {
// Rebuild record values reference original page memory so make a copy
if(newPages.empty()) {
newPages = BTreePageIDRef(m.rebuild.arena(), oldPages);
rec.setChildPage(newPages);
}
debug_printf("%s Detach updated %u -> %u\n", context.c_str(), p, newID);
newPages[i] = newID;
++stats.detachChild;
}
}
}
}
}
parentInfo->clear();
}
Standalone<VectorRef<RedwoodRecordRef>> newChildEntries =
wait(writePages(self, update->subtreeLowerBound, update->subtreeUpperBound, m.rebuild,
@ -4985,7 +5252,7 @@ public:
bool isValid() const { return valid; }
std::string toString() const {
std::string r;
std::string r = format("{ptr=%p %s ", this, ::toString(pager->getVersion()).c_str());
for (int i = 0; i < path.size(); ++i) {
r += format("[%d/%d: %s] ", i + 1, path.size(),
path[i].cursor.valid() ? path[i].cursor.get().toString(path[i].btPage->isLeaf()).c_str()
@ -4994,6 +5261,7 @@ public:
if (!valid) {
r += " (invalid) ";
}
r += "}";
return r;
}
@ -5014,6 +5282,8 @@ public:
const RedwoodRecordRef& upperBound) {
Reference<const IPage>& page = pages[id.front()];
if (page.isValid()) {
// The pager won't see this access so count it as a cache hit
++g_redwoodMetrics.pagerCacheHit;
path.push_back(arena, { (BTreePage*)page->begin(), getCursor(page) });
return Void();
}
@ -6960,24 +7230,23 @@ TEST_CASE("!/redwood/correctness/btree") {
state int pageSize =
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
state int64_t targetPageOps = shortTest ? 50000 : 1000000;
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01);
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
state int maxValueSize = randomSize(pageSize * 25);
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
state int mutationBytesTarget =
shortTest ? 100000 : randomSize(std::min<int>(maxCommitSize * 100, pageSize * 100000));
state double clearProbability = deterministicRandom()->random01() * .1;
state double clearSingleKeyProbability = deterministicRandom()->random01();
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
state double advanceOldVersionProbability = deterministicRandom()->random01();
state double maxDuration = 60;
state int64_t cacheSizeBytes =
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
printf("\n");
printf("targetPageOps: %" PRId64 "\n", targetPageOps);
printf("pagerMemoryOnly: %d\n", pagerMemoryOnly);
printf("serialTest: %d\n", serialTest);
printf("shortTest: %d\n", shortTest);
@ -6985,7 +7254,6 @@ TEST_CASE("!/redwood/correctness/btree") {
printf("maxKeySize: %d\n", maxKeySize);
printf("maxValueSize: %d\n", maxValueSize);
printf("maxCommitSize: %d\n", maxCommitSize);
printf("mutationBytesTarget: %d\n", mutationBytesTarget);
printf("clearProbability: %f\n", clearProbability);
printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability);
printf("clearPostSetProbability: %f\n", clearPostSetProbability);
@ -7000,8 +7268,6 @@ TEST_CASE("!/redwood/correctness/btree") {
deleteFile(pagerFile);
printf("Initializing...\n");
state double startTime = now();
pager = new DWALPager(pageSize, pagerFile, cacheSizeBytes, remapCleanupWindow, pagerMemoryOnly);
state VersionedBTree* btree = new VersionedBTree(pager, pagerFile);
wait(btree->init());
@ -7028,14 +7294,12 @@ TEST_CASE("!/redwood/correctness/btree") {
state PromiseStream<Version> committedVersions;
state Future<Void> verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
state Future<Void> randomTask = serialTest ? Void() : (randomReader(btree) || btree->getError());
committedVersions.send(lastVer);
state Future<Void> commit = Void();
state int64_t totalPageOps = 0;
while (mutationBytes.get() < mutationBytesTarget && (now() - startTime) < maxDuration) {
if (now() - startTime > 600) {
mutationBytesTarget = mutationBytes.get();
}
while (totalPageOps < targetPageOps) {
// Sometimes increment the version
if (deterministicRandom()->random01() < 0.10) {
++version;
@ -7131,14 +7395,12 @@ TEST_CASE("!/redwood/correctness/btree") {
}
// Commit at end or after this commit's mutation bytes are reached
if (mutationBytes.get() >= mutationBytesTarget || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
if (totalPageOps >= targetPageOps || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
// Wait for previous commit to finish
wait(commit);
printf("Committed. Next commit %d bytes, %" PRId64
"/%d (%.2f%%) Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
mutationBytesThisCommit, mutationBytes.get(), mutationBytesTarget,
(double)mutationBytes.get() / mutationBytesTarget * 100,
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
printf("Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get());
printf(" Stats: Insert %.2f MB/s ClearedKeys %.2f MB/s Total %.2f\n",
(keyBytesInserted.rate() + valueBytesInserted.rate()) / 1e6, keyBytesCleared.rate() / 1e6,
mutationBytes.rate() / 1e6);
Version v = version; // Avoid capture of version as a member of *this
@ -7151,8 +7413,12 @@ TEST_CASE("!/redwood/correctness/btree") {
btree->getOldestVersion() + 1));
}
commit = map(btree->commit(), [=](Void) {
commit = map(btree->commit(), [=,&ops=totalPageOps](Void) {
// Update pager ops before clearing metrics
ops += g_redwoodMetrics.pageOps();
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%)\n", ops, targetPageOps, ops * 100.0 / targetPageOps);
printf("Committed:\n%s\n", g_redwoodMetrics.toString(true).c_str());
// Notify the background verifier that version is committed and therefore readable
committedVersions.send(v);
return Void();
@ -7202,6 +7468,7 @@ TEST_CASE("!/redwood/correctness/btree") {
committedVersions = PromiseStream<Version>();
verifyTask = verify(btree, committedVersions.getFuture(), &written, &errorCount, serialTest);
randomTask = randomReader(btree) || btree->getError();
committedVersions.send(v);
}
version += versionIncrement;
@ -7209,7 +7476,7 @@ TEST_CASE("!/redwood/correctness/btree") {
}
// Check for errors
if (errorCount != 0) throw internal_error();
ASSERT(errorCount == 0);
}
debug_printf("Waiting for outstanding commit\n");
@ -7220,11 +7487,18 @@ TEST_CASE("!/redwood/correctness/btree") {
wait(verifyTask);
// Check for errors
if (errorCount != 0) throw internal_error();
ASSERT(errorCount == 0);
wait(btree->destroyAndCheckSanity());
// Reopen pager and btree with a remap cleanup window of 0 to reclaim all old pages
state Future<Void> closedFuture = btree->onClosed();
btree->close();
wait(closedFuture);
btree = new VersionedBTree(new DWALPager(pageSize, pagerFile, cacheSizeBytes, 0), pagerFile);
wait(btree->init());
Future<Void> closedFuture = btree->onClosed();
wait(btree->clearAllAndCheckSanity());
closedFuture = btree->onClosed();
btree->close();
debug_printf("Closing.\n");
wait(closedFuture);
@ -7330,7 +7604,7 @@ TEST_CASE("!/redwood/performance/set") {
state int minValueSize = 100;
state int maxValueSize = 500;
state int minConsecutiveRun = 1;
state int maxConsecutiveRun = 10;
state int maxConsecutiveRun = 100000;
state char firstKeyChar = 'a';
state char lastKeyChar = 'm';
state Version remapCleanupWindow = SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;

View File

@ -85,11 +85,12 @@ struct TrackIt {
class NonCopyable
{
protected:
NonCopyable () {}
~NonCopyable () {} /// Protected non-virtual destructor
private:
NonCopyable (const NonCopyable &);
NonCopyable & operator = (const NonCopyable &);
NonCopyable()=default;
~NonCopyable()=default; /// Protected non-virtual destructor
NonCopyable(NonCopyable&&)=default;
NonCopyable &operator=(NonCopyable&&)=default;
NonCopyable(const NonCopyable&)=delete;
NonCopyable &operator=(const NonCopyable &)=delete;
};
// An Arena is a custom allocator that consists of a set of ArenaBlocks. Allocation is performed by bumping a pointer
@ -174,9 +175,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted<ArenaBlock>
static ArenaBlock* create(int dataSize, Reference<ArenaBlock>& next);
void destroy();
void destroyLeaf();
private:
static void* operator new(size_t s); // not implemented
static void* operator new(size_t s)=delete;
};
inline void* operator new ( size_t size, Arena& p ) {

View File

@ -118,6 +118,7 @@ public:
static volatile int32_t pageCount;
#endif
FastAllocator()=delete;
private:
#ifdef VALGRIND
static unsigned long vLock;
@ -147,7 +148,6 @@ private:
}
static void* freelist;
FastAllocator(); // not implemented
static void initThread();
static void getMagazine();
static void releaseMagazine(void*);

View File

@ -71,11 +71,10 @@ class ThreadPool : public IThreadPool, public ReferenceCounted<ThreadPool> {
PThreadAction action;
ActionWrapper(PThreadAction action) : action(action) {}
// HACK: Boost won't use move constructors, so we just assume the last copy made is the one that will be called or cancelled
ActionWrapper(ActionWrapper const& r) : action(r.action) { const_cast<ActionWrapper&>(r).action=NULL; }
void operator()() { Thread::dispatch(action); action = NULL; }
ActionWrapper(ActionWrapper const& r) : action(r.action) { const_cast<ActionWrapper&>(r).action=nullptr; }
void operator()() { Thread::dispatch(action); action = nullptr; }
~ActionWrapper() { if (action) { action->cancel(); } }
private:
ActionWrapper &operator=(ActionWrapper const&);
ActionWrapper &operator=(ActionWrapper const&)=delete;
};
public:
ThreadPool(int stackSize) : dontstop(ios), mode(Run), stackSize(stackSize) {}

View File

@ -128,6 +128,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
};
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.

View File

@ -1,11 +1,9 @@
testTitle=StorefrontTest
clearAfterTest=false
testName=Storefront
actorsPerClient=50
itemCount=100000
maxOrderSize=4
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0
clearAfterTest=false
testName=Storefront
actorsPerClient=50
itemCount=100000
maxOrderSize=4
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0

View File

@ -1,7 +1,6 @@
testTitle=StorefrontTest
runSetup=false
testName=Storefront
actorsPerClient=50
itemCount=100000
maxOrderSize=4
testName=Storefront
actorsPerClient=50
itemCount=100000
maxOrderSize=4