Added throttling when a blob worker falls behind (#7751)

* throttle the cluster when blob workers fall behind

* do not throttle on blob workers if they are not enabled

* remove an unnecessary actor

* fixed a compile error

* fetch blob worker metrics at the same interval as the rate is updated, avoid fetching the complete blob worker list too frequently

* fixed another compilation bug

* added a 5 second delay before bw throttling to prevent false positives caused by the 100e6 version jump during recovery. Lower the throttling thresholds to react much quicker to bw lag.

* fixed a number of problems

* changed the minBlobVersionRequest to look at storage server versions since this will be a lot more efficient

* fix: do not let desired go backwards

* fix: track the version of notAtLatest changefeeds for throttling

* ratekeeper now throttled blob workers by estimating the transaction per second throughput of the blob workers

* added metrics for blob worker change feeds

* added a knob to disable bw throttling

* fixed the transaction options in blob manager
This commit is contained in:
Evan Tschannen 2022-08-12 13:15:56 -07:00 committed by GitHub
parent 35a3a33d1c
commit a9d3c9f9b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 531 additions and 56 deletions

View File

@ -379,7 +379,9 @@
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
"storage_server_list_fetch_failed",
"blob_worker_lag",
"blob_worker_missing"
]
},
"description":"The database is not being saturated by the workload."
@ -400,7 +402,9 @@
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
"storage_server_list_fetch_failed",
"blob_worker_lag",
"blob_worker_missing"
]
},
"description":"The database is not being saturated by the workload."

View File

@ -131,6 +131,9 @@ min_free_space_ratio Running out of space (approaching 5% limit).
log_server_min_free_space Log server running out of space (approaching 100MB limit).
log_server_min_free_space_ratio Log server running out of space (approaching 5% limit).
storage_server_durability_lag Storage server durable version falling behind.
storage_server_list_fetch_failed Unable to fetch storage server list.
blob_worker_lag Blob worker granule version falling behind.
blob_worker_missing No blob workers are reporting metrics.
=================================== ====================================================
The JSON path ``cluster.qos.throttled_tags``, when it exists, is an Object containing ``"auto"`` , ``"manual"`` and ``"recommended"``. The possible fields for those object are in the following table:

View File

@ -23,6 +23,7 @@
#include <algorithm>
#include <cstdio>
#include <iterator>
#include <limits>
#include <memory>
#include <regex>
#include <unordered_set>
@ -1826,6 +1827,9 @@ DatabaseContext::~DatabaseContext() {
it->second->notifyContextDestroyed();
ASSERT_ABORT(server_interf.empty());
locationCache.insert(allKeys, Reference<LocationInfo>());
for (auto& it : notAtLatestChangeFeeds) {
it.second->context = nullptr;
}
TraceEvent("DatabaseContextDestructed", dbId).backtrace();
}
@ -8705,6 +8709,39 @@ Reference<ChangeFeedStorageData> DatabaseContext::getStorageData(StorageServerIn
return it->second;
}
Version DatabaseContext::getMinimumChangeFeedVersion() {
Version minVersion = std::numeric_limits<Version>::max();
for (auto& it : changeFeedUpdaters) {
minVersion = std::min(minVersion, it.second->version.get());
}
for (auto& it : notAtLatestChangeFeeds) {
if (it.second->getVersion() > 0) {
minVersion = std::min(minVersion, it.second->getVersion());
}
}
return minVersion;
}
void DatabaseContext::setDesiredChangeFeedVersion(Version v) {
for (auto& it : changeFeedUpdaters) {
if (it.second->version.get() < v && it.second->desired.get() < v) {
it.second->desired.set(v);
}
}
}
ChangeFeedData::ChangeFeedData(DatabaseContext* context)
: dbgid(deterministicRandom()->randomUniqueID()), context(context), notAtLatest(1) {
if (context) {
context->notAtLatestChangeFeeds[dbgid] = this;
}
}
ChangeFeedData::~ChangeFeedData() {
if (context) {
context->notAtLatestChangeFeeds.erase(dbgid);
}
}
Version ChangeFeedData::getVersion() {
return lastReturnedVersion.get();
}
@ -8896,6 +8933,9 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
if (refresh.canBeSet() && !atLatestVersion && rep.atLatestVersion) {
atLatestVersion = true;
feedData->notAtLatest.set(feedData->notAtLatest.get() - 1);
if (feedData->notAtLatest.get() == 0 && feedData->context) {
feedData->context->notAtLatestChangeFeeds.erase(feedData->dbgid);
}
}
if (refresh.canBeSet() && rep.minStreamVersion > storageData->version.get()) {
storageData->version.set(rep.minStreamVersion);
@ -9099,6 +9139,9 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
results->storageData.push_back(db->getStorageData(interfs[i].first));
}
results->notAtLatest.set(interfs.size());
if (results->context) {
results->context->notAtLatestChangeFeeds[results->dbgid] = results.getPtr();
}
refresh.send(Void());
for (int i = 0; i < interfs.size(); i++) {
@ -9251,6 +9294,9 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
if (results->context) {
results->context->notAtLatestChangeFeeds.erase(results->dbgid);
}
}
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
@ -9302,6 +9348,9 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
results->notAtLatest.set(1);
if (results->context) {
results->context->notAtLatestChangeFeeds[results->dbgid] = results.getPtr();
}
refresh.send(Void());
wait(results->streams[0].onError() || singleChangeFeedStreamInternal(range, results, rangeID, begin, end));
@ -9428,6 +9477,9 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
}
if (results->notAtLatest.get() == 0) {
results->notAtLatest.set(1);
if (results->context) {
results->context->notAtLatestChangeFeeds[results->dbgid] = results.getPtr();
}
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||

View File

@ -427,7 +427,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
"storage_server_list_fetch_failed",
"blob_worker_lag",
"blob_worker_missing"
]
},
"description":"The database is not being saturated by the workload."
@ -448,7 +450,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
"storage_server_list_fetch_failed",
"blob_worker_lag",
"blob_worker_missing"
]
},
"description":"The database is not being saturated by the workload."

View File

@ -654,6 +654,16 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DURABILITY_LAG_REDUCTION_RATE, 0.9999 );
init( DURABILITY_LAG_INCREASE_RATE, 1.001 );
init( STORAGE_SERVER_LIST_FETCH_TIMEOUT, 20.0 );
init( BW_THROTTLING_ENABLED, true );
init( TARGET_BW_LAG, 50.0 );
init( TARGET_BW_LAG_BATCH, 20.0 );
init( TARGET_BW_LAG_UPDATE, 9.0 );
init( MIN_BW_HISTORY, 10 );
init( BW_ESTIMATION_INTERVAL, 10.0 );
init( BW_LAG_INCREASE_AMOUNT, 1.1 );
init( BW_LAG_DECREASE_AMOUNT, 0.9 );
init( BW_FETCH_WORKERS_INTERVAL, 5.0 );
init( BW_RW_LOGGING_INTERVAL, 5.0 );
init( MAX_AUTO_THROTTLED_TRANSACTION_TAGS, 5 ); if(randomize && BUGGIFY) MAX_AUTO_THROTTLED_TRANSACTION_TAGS = 1;
init( MAX_MANUAL_THROTTLED_TRANSACTION_TAGS, 40 ); if(randomize && BUGGIFY) MAX_MANUAL_THROTTLED_TRANSACTION_TAGS = 1;

View File

@ -49,6 +49,8 @@ struct BlobWorkerStats {
int mutationBytesBuffered;
int activeReadRequests;
int granulesPendingSplitCheck;
Version minimumCFVersion;
int notAtLatestChangeFeeds;
int64_t lastResidentMemory;
int64_t estimatedMaxResidentMemory;
@ -79,12 +81,15 @@ struct BlobWorkerStats {
readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc),
flushGranuleReqs("FlushGranuleReqs", cc), compressionBytesRaw("CompressionBytesRaw", cc),
compressionBytesFinal("CompressionBytesFinal", cc), fullRejections("FullRejections", cc), numRangesAssigned(0),
mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0),
initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock), deltaWritesLock(deltaWritesLock) {
mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0), minimumCFVersion(0),
notAtLatestChangeFeeds(0), initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock),
deltaWritesLock(deltaWritesLock) {
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; });
specialCounter(cc, "ActiveReadRequests", [this]() { return this->activeReadRequests; });
specialCounter(cc, "GranulesPendingSplitCheck", [this]() { return this->granulesPendingSplitCheck; });
specialCounter(cc, "MinimumChangeFeedVersion", [this]() { return this->minimumCFVersion; });
specialCounter(cc, "NotAtLatestChangeFeeds", [this]() { return this->notAtLatestChangeFeeds; });
specialCounter(cc, "InitialSnapshotsActive", [this]() { return this->initialSnapshotLock->activePermits(); });
specialCounter(cc, "InitialSnapshotsWaiting", [this]() { return this->initialSnapshotLock->waiters(); });
specialCounter(cc, "ReSnapshotsActive", [this]() { return this->resnapshotLock->activePermits(); });

View File

@ -39,6 +39,7 @@ struct BlobWorkerInterface {
RequestStream<struct GranuleStatusStreamRequest> granuleStatusStreamRequest;
RequestStream<struct HaltBlobWorkerRequest> haltBlobWorker;
RequestStream<struct FlushGranuleRequest> flushGranuleRequest;
RequestStream<struct MinBlobVersionRequest> minBlobVersionRequest;
struct LocalityData locality;
UID myId;
@ -57,6 +58,7 @@ struct BlobWorkerInterface {
streams.push_back(granuleStatusStreamRequest.getReceiver());
streams.push_back(haltBlobWorker.getReceiver());
streams.push_back(flushGranuleRequest.getReceiver());
streams.push_back(minBlobVersionRequest.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
UID id() const { return myId; }
@ -85,6 +87,8 @@ struct BlobWorkerInterface {
RequestStream<struct HaltBlobWorkerRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(6));
flushGranuleRequest =
RequestStream<struct FlushGranuleRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(7));
minBlobVersionRequest =
RequestStream<struct MinBlobVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(8));
}
}
};
@ -139,6 +143,28 @@ struct RevokeBlobRangeRequest {
}
};
struct MinBlobVersionReply {
constexpr static FileIdentifier file_identifier = 6857512;
Version version;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
}
};
struct MinBlobVersionRequest {
constexpr static FileIdentifier file_identifier = 4833278;
Version grv;
ReplyPromise<MinBlobVersionReply> reply;
MinBlobVersionRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, grv, reply);
}
};
/*
* Continue: Blob worker should continue handling a granule that was evaluated for a split
* Normal: Blob worker should open the granule and start processing it

View File

@ -25,6 +25,7 @@
#include "flow/FastRef.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "flow/IRandom.h"
#include "flow/genericactors.actor.h"
#include <vector>
#include <unordered_map>
@ -180,6 +181,8 @@ struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
Version getVersion();
Future<Void> whenAtLeast(Version version);
UID dbgid;
DatabaseContext* context;
NotifiedVersion lastReturnedVersion;
std::vector<Reference<ChangeFeedStorageData>> storageData;
AsyncVar<int> notAtLatest;
@ -189,7 +192,8 @@ struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
Version popVersion =
invalidVersion; // like TLog pop version, set by SS and client can check it to see if they missed data
ChangeFeedData() : notAtLatest(1) {}
explicit ChangeFeedData(DatabaseContext* context = nullptr);
~ChangeFeedData();
};
struct EndpointFailureInfo {
@ -468,8 +472,11 @@ public:
// map from changeFeedId -> changeFeedRange
std::unordered_map<Key, KeyRange> changeFeedCache;
std::unordered_map<UID, Reference<ChangeFeedStorageData>> changeFeedUpdaters;
std::map<UID, ChangeFeedData*> notAtLatestChangeFeeds;
Reference<ChangeFeedStorageData> getStorageData(StorageServerInterface interf);
Version getMinimumChangeFeedVersion();
void setDesiredChangeFeedVersion(Version v);
// map from ssid -> ss tag
// @note this map allows the client to identify the latest commit versions

View File

@ -613,8 +613,17 @@ public:
double INITIAL_DURABILITY_LAG_MULTIPLIER;
double DURABILITY_LAG_REDUCTION_RATE;
double DURABILITY_LAG_INCREASE_RATE;
double STORAGE_SERVER_LIST_FETCH_TIMEOUT;
bool BW_THROTTLING_ENABLED;
double TARGET_BW_LAG;
double TARGET_BW_LAG_BATCH;
double TARGET_BW_LAG_UPDATE;
int MIN_BW_HISTORY;
double BW_ESTIMATION_INTERVAL;
double BW_LAG_INCREASE_AMOUNT;
double BW_LAG_DECREASE_AMOUNT;
double BW_FETCH_WORKERS_INTERVAL;
double BW_RW_LOGGING_INTERVAL;
// disk snapshot
int64_t MAX_FORKED_PROCESS_OUTPUT;

View File

@ -112,6 +112,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID) {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {

View File

@ -545,10 +545,11 @@ ACTOR Future<BlobGranuleSplitPoints> alignKeys(Reference<BlobManagerData> bmData
splitPoints.keys.push_back_deep(splitPoints.keys.arena(), splits.front());
state Transaction tr = Transaction(bmData->db);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state int idx = 1;
for (; idx < splits.size() - 1; idx++) {
loop {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// Get the next full key in the granule.
RangeResult nextKeyRes = wait(
@ -1146,8 +1147,9 @@ ACTOR Future<Void> writeInitialGranuleMapping(Reference<BlobManagerData> bmData,
state int j = 0;
loop {
try {
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkManagerLock(tr, bmData));
// Instead of doing a krmSetRange for each granule, because it does a read-modify-write, we do one
// krmSetRange for the whole batch, and then just individual sets for each intermediate boundary This
@ -1204,6 +1206,7 @@ ACTOR Future<Void> monitorTenants(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(loadTenantMap(tr, bmData));
state Future<Void> watchChange = tr->watch(TenantMetadata::lastTenantId().key);
@ -1232,6 +1235,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// read change key at this point along with data
state Optional<Value> ckvBegin = wait(tr->get(blobRangeChangeKey));
@ -1336,6 +1340,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Void> watchFuture;
Optional<Value> ckvEnd = wait(tr->get(blobRangeChangeKey));
@ -1468,8 +1473,9 @@ ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
state bool retried = false;
loop {
try {
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// make sure we're still manager when this transaction gets committed
wait(checkManagerLock(tr, bmData));
@ -1717,8 +1723,9 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
ASSERT(splitPoints.keys.size() > 2);
// make sure we're still manager when this transaction gets committed
@ -1949,6 +1956,8 @@ ACTOR Future<Void> forceGranuleFlush(Reference<BlobManagerData> bmData, KeyRange
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
if (currentRange.begin == currentRange.end) {
break;
}
@ -2091,6 +2100,7 @@ ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobMa
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkManagerLock(tr, bmData));
@ -2185,6 +2195,7 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkManagerLock(tr, bmData));
@ -2566,6 +2577,7 @@ ACTOR Future<Void> deregisterBlobWorker(Reference<BlobManagerData> bmData, BlobW
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
wait(checkManagerLock(tr, bmData));
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
@ -3161,6 +3173,7 @@ ACTOR Future<Void> loadForcePurgedRanges(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// using the krm functions can produce incorrect behavior here as it does weird stuff with beginKey
KeyRange nextRange(KeyRangeRef(beginKey, blobGranuleForcePurgedKeys.end));
@ -3207,6 +3220,7 @@ ACTOR Future<Void> resumeActiveMerges(Reference<BlobManagerData> bmData, Future<
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
RangeResult result = wait(tr->getRange(currentRange, rowLimit));
state bool anyMore = result.more;
@ -3291,6 +3305,7 @@ ACTOR Future<Void> loadBlobGranuleMergeBoundaries(Reference<BlobManagerData> bmD
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
KeyRange nextRange(KeyRangeRef(beginKey, blobGranuleMergeBoundaryKeys.end));
// using the krm functions can produce incorrect behavior here as it does weird stuff with beginKey
@ -3340,6 +3355,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
RangeResult existingForcePurgeKeys = wait(tr->getRange(blobGranuleForcePurgedKeys, 1));
if (!existingForcePurgeKeys.empty()) {
break;
@ -3468,6 +3484,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
KeyRange nextRange(KeyRangeRef(beginKey, blobGranuleMappingKeys.end));
// using the krm functions can produce incorrect behavior here as it does weird stuff with beginKey
@ -3529,6 +3546,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkManagerLock(tr, bmData));
wait(tr->commit());
break;
@ -3605,6 +3623,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(loadTenantMap(tr, bmData));
break;
} catch (Error& e) {
@ -3864,6 +3883,8 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
state GranuleFiles files;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {
@ -3886,6 +3907,7 @@ ACTOR Future<bool> canDeleteFullGranule(Reference<BlobManagerData> self, UID gra
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
int lim = SERVER_KNOBS->BG_MAX_SPLIT_FANOUT;
if (BUGGIFY_WITH_PROB(0.1)) {
@ -4054,10 +4076,11 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
}
state Transaction tr(self->db);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
KeyRange fileRangeKey = blobGranuleFileKeyRangeFor(granuleId);
if (canDeleteHistoryKey) {
@ -4190,10 +4213,11 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
}
state Transaction tr(self->db);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
for (auto& key : deletedFileKeys) {
tr.clear(key);
@ -4260,14 +4284,15 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
// find all active granules (that comprise the range) and add to the queue
state Transaction tr(self->db);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if (force) {
// TODO could clean this up after force purge is done, but it's safer not to
self->forcePurgingRanges.insert(range, true);
// set force purged range, to prevent future operations on this range
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// set force purged range, but don't clear mapping range yet, so that if a new BM recovers in the middle
// of purging, it still knows what granules to purge
@ -4324,6 +4349,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
}
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} Fetching latest history entry for range [{1} - {2})\n",
@ -4384,6 +4412,9 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
state Key historyKey = blobGranuleHistoryKeyFor(currRange, startVersion);
state bool foundHistory = false;
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Optional<Value> persistedHistory = wait(tr.get(historyKey));
if (persistedHistory.present()) {
@ -4532,6 +4563,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
loop {
try {
// clear mapping range, so that a new BM doesn't try to recover force purged granules, and clients can't
@ -4609,6 +4641,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<Future<Void>> purges;
state CoalescedKeyRangeMap<std::pair<Version, bool>> purgeMap;
@ -4685,6 +4718,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->clear(KeyRangeRef(blobGranulePurgeKeys.begin, keyAfter(lastPurgeKey)));
wait(tr->commit());
break;
@ -4715,6 +4749,7 @@ ACTOR Future<Void> doLockChecks(Reference<BlobManagerData> bmData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkManagerLock(tr, bmData));
wait(tr->commit());
break;

View File

@ -497,6 +497,8 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobWorkerData> bwData, UI
state GranuleFiles files;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {
@ -743,6 +745,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
numIterations++;
@ -941,6 +945,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
try {
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
numIterations++;
@ -1025,6 +1031,8 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// FIXME: proper tenant support in Blob Worker
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Version rv = wait(tr->getReadVersion());
readVersion = rv;
@ -1922,7 +1930,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->bufferedDeltaVersion = startVersion;
metadata->knownCommittedVersion = startVersion;
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>();
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>(bwData->db.getPtr());
if (startState.splitParentGranule.present() && startVersion < startState.changeFeedStartVersion) {
// read from parent change feed up until our new change feed is started
@ -2079,7 +2087,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// update this for change feed popped detection
metadata->bufferedDeltaVersion = metadata->activeCFData.get()->getVersion();
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>();
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>(bwData->db.getPtr());
changeFeedFuture = bwData->db->getChangeFeedStream(cfData,
cfKey,
@ -2189,7 +2197,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
lastForceFlushVersion = 0;
metadata->forceFlushVersion = NotifiedVersion();
Reference<ChangeFeedData> cfData = makeReference<ChangeFeedData>();
Reference<ChangeFeedData> cfData =
makeReference<ChangeFeedData>(bwData->db.getPtr());
if (!readOldChangeFeed && cfRollbackVersion < startState.changeFeedStartVersion) {
// It isn't possible to roll back across the parent/child feed boundary,
@ -2705,6 +2714,8 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
break;
}
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyRangeRef parentRange(curHistory.value.parentBoundaries[pIdx],
curHistory.value.parentBoundaries[pIdx + 1]);
state Version parentVersion = curHistory.value.parentVersions[pIdx];
@ -3649,6 +3660,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state GranuleStartState info;
info.changeFeedStartVersion = invalidVersion;
@ -4230,12 +4242,23 @@ ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlo
}
}
void handleBlobVersionRequest(Reference<BlobWorkerData> bwData, MinBlobVersionRequest req) {
bwData->db->setDesiredChangeFeedVersion(
std::max<Version>(0, req.grv - (SERVER_KNOBS->TARGET_BW_LAG_UPDATE * SERVER_KNOBS->VERSIONS_PER_SECOND)));
MinBlobVersionReply rep;
rep.version = bwData->db->getMinimumChangeFeedVersion();
bwData->stats.minimumCFVersion = rep.version;
bwData->stats.notAtLatestChangeFeeds = bwData->db->notAtLatestChangeFeeds.size();
req.reply.send(rep);
}
ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
TraceEvent("BlobWorkerRegister", bwData->id);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
// FIXME: should be able to remove this conflict range
@ -4272,6 +4295,7 @@ ACTOR Future<Void> monitorRemoval(Reference<BlobWorkerData> bwData) {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr.get(blobWorkerListKey));
if (!val.present()) {
@ -4309,6 +4333,8 @@ ACTOR Future<Void> runGRVChecks(Reference<BlobWorkerData> bwData) {
tr.reset();
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Version readVersion = wait(tr.getReadVersion());
ASSERT(readVersion >= bwData->grvVersion.get());
bwData->grvVersion.set(readVersion);
@ -4329,6 +4355,7 @@ ACTOR Future<Void> monitorTenants(Reference<BlobWorkerData> bwData) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedRangeResult<std::pair<TenantName, TenantMapEntry>> tenantResults;
wait(store(tenantResults,
TenantMetadata::tenantMap().getRange(tr,
@ -4768,6 +4795,9 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
req.reply.sendError(blob_manager_replaced());
}
}
when(MinBlobVersionRequest req = waitNext(bwInterf.minBlobVersionRequest.getFuture())) {
handleBlobVersionRequest(self, req);
}
when(FlushGranuleRequest req = waitNext(bwInterf.flushGranuleRequest.getFuture())) {
if (self->managerEpochOk(req.managerEpoch)) {
if (BW_DEBUG) {

View File

@ -254,6 +254,7 @@ struct GrvProxyData {
int updateCommitRequests;
NotifiedDouble lastCommitTime;
Version version;
Version minKnownCommittedVersion; // we should ask master for this version.
// Cache of the latest commit versions of storage servers.
@ -286,7 +287,7 @@ struct GrvProxyData {
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
updateCommitRequests(0), lastCommitTime(0), minKnownCommittedVersion(invalidVersion) {}
updateCommitRequests(0), lastCommitTime(0), version(0), minKnownCommittedVersion(invalidVersion) {}
};
ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
@ -437,7 +438,8 @@ ACTOR Future<Void> getRate(UID myID,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* clientThrottledTags,
PrioritizedTransactionTagMap<GrvTransactionRateInfo>* perTagRateInfo,
GrvProxyStats* stats) {
GrvProxyStats* stats,
GrvProxyData* proxyData) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -462,8 +464,13 @@ ACTOR Future<Void> getRate(UID myID,
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(
myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed)));
reply = brokenPromiseToNever(
db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID,
*inTransactionCount,
*inBatchTransactionCount,
proxyData->version,
*transactionTagCounter,
detailed)));
transactionTagCounter->clear();
expectingDetailedReply = detailed;
}
@ -707,6 +714,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanContext parentSpan
}
GetRawCommittedVersionReply repFromMaster = wait(replyFromMasterFuture);
grvProxyData->version = std::max(grvProxyData->version, repFromMaster.version);
grvProxyData->minKnownCommittedVersion =
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
@ -913,7 +921,8 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
&transactionTagCounter,
&clientThrottledTags,
&perTagRateInfo,
&grvProxyData->stats));
&grvProxyData->stats,
grvProxyData));
addActor.send(queueGetReadVersionRequests(db,
&systemQueue,
&defaultQueue,

View File

@ -235,7 +235,9 @@ ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx,
}
// Returns a vector of blob worker interfaces which have been persisted under the system key space
ACTOR Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database cx, bool use_system_priority = false) {
ACTOR Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database cx,
bool use_system_priority = false,
Version* grv = nullptr) {
state Transaction tr(cx);
loop {
if (use_system_priority) {
@ -252,6 +254,9 @@ ACTOR Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database cx, bool
for (int i = 0; i < blobWorkersList.size(); i++) {
blobWorkers.push_back(decodeBlobWorkerListValue(blobWorkersList[i].value));
}
if (grv) {
*grv = tr.getReadVersion().get();
}
return blobWorkers;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -23,6 +23,7 @@
#include "fdbserver/Ratekeeper.h"
#include "fdbserver/TagThrottler.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/QuietDatabase.h"
#include "flow/OwningResource.h"
#include "flow/actorcompiler.h" // must be last include
@ -38,7 +39,9 @@ const char* limitReasonName[] = { "workload",
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed" };
"storage_server_list_fetch_failed",
"blob_worker_lag",
"blob_worker_missing" };
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
int limitReasonEnd = limitReason_t_end;
@ -56,7 +59,9 @@ const char* limitReasonDesc[] = { "Workload or read performance.",
"Log server running out of space (approaching 100MB limit).",
"Log server running out of space (approaching 5% limit).",
"Storage server durable version falling behind.",
"Unable to fetch storage server list." };
"Unable to fetch storage server list.",
"Blob worker granule version falling behind.",
"No blob workers are reporting metrics." };
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
@ -241,6 +246,83 @@ public:
}
}
ACTOR static Future<Void> monitorBlobWorkers(Ratekeeper* self) {
state std::vector<BlobWorkerInterface> blobWorkers;
state int workerFetchCount = 0;
state double lastStartTime = 0;
state double startTime = 0;
state bool blobWorkerDead = false;
state double lastLoggedTime = 0;
loop {
while (!self->configuration.blobGranulesEnabled) {
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
}
state Version grv;
state Future<Void> blobWorkerDelay =
delay(SERVER_KNOBS->METRIC_UPDATE_RATE * FLOW_KNOBS->DELAY_JITTER_OFFSET);
int fetchAmount = SERVER_KNOBS->BW_FETCH_WORKERS_INTERVAL /
(SERVER_KNOBS->METRIC_UPDATE_RATE * FLOW_KNOBS->DELAY_JITTER_OFFSET);
if (++workerFetchCount == fetchAmount || blobWorkerDead) {
workerFetchCount = 0;
std::vector<BlobWorkerInterface> _blobWorkers = wait(getBlobWorkers(self->db, true, &grv));
blobWorkers = _blobWorkers;
} else {
grv = self->maxVersion;
}
lastStartTime = startTime;
startTime = now();
if (blobWorkers.size() > 0) {
state std::vector<Future<Optional<MinBlobVersionReply>>> aliveVersions;
aliveVersions.reserve(blobWorkers.size());
for (auto& it : blobWorkers) {
MinBlobVersionRequest req;
req.grv = grv;
aliveVersions.push_back(timeout(brokenPromiseToNever(it.minBlobVersionRequest.getReply(req)),
SERVER_KNOBS->BLOB_WORKER_TIMEOUT));
}
wait(waitForAll(aliveVersions));
Version minVer = grv;
blobWorkerDead = false;
int minIdx = 0;
for (int i = 0; i < blobWorkers.size(); i++) {
if (aliveVersions[i].get().present()) {
if (aliveVersions[i].get().get().version < minVer) {
minVer = aliveVersions[i].get().get().version;
minIdx = i;
}
} else {
blobWorkerDead = true;
minVer = 0;
break;
}
}
if (minVer > 0 && blobWorkers.size() > 0) {
while (!self->blobWorkerVersionHistory.empty() &&
minVer < self->blobWorkerVersionHistory.back().second) {
self->blobWorkerVersionHistory.pop_back();
}
self->blobWorkerVersionHistory.push_back(std::make_pair(now(), minVer));
}
while (self->blobWorkerVersionHistory.size() > SERVER_KNOBS->MIN_BW_HISTORY &&
self->blobWorkerVersionHistory[1].first <
self->blobWorkerVersionHistory.back().first - SERVER_KNOBS->BW_ESTIMATION_INTERVAL) {
self->blobWorkerVersionHistory.pop_front();
}
if (now() - lastLoggedTime > SERVER_KNOBS->BW_RW_LOGGING_INTERVAL) {
lastLoggedTime = now();
TraceEvent("RkMinBlobWorkerVersion")
.detail("BWVersion", minVer)
.detail("MinId", blobWorkers[minIdx].id());
}
}
wait(blobWorkerDelay);
}
}
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state ActorOwningSelfRef<Ratekeeper> pSelf(
new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
@ -261,6 +343,9 @@ public:
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
self.addActor.send(self.monitorThrottlingChanges());
if (SERVER_KNOBS->BW_THROTTLING_ENABLED) {
self.addActor.send(self.monitorBlobWorkers());
}
self.addActor.send(self.refreshStorageServerCommitCosts());
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
@ -294,6 +379,42 @@ public:
state bool lastLimited = false;
loop choose {
when(wait(timeout)) {
double actualTps = self.smoothReleasedTransactions.smoothRate();
actualTps =
std::max(std::max(1.0, actualTps),
self.smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT);
if (self.actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) {
self.actualTpsHistory.pop_front();
}
self.actualTpsHistory.push_back(actualTps);
if (self.configuration.blobGranulesEnabled && SERVER_KNOBS->BW_THROTTLING_ENABLED) {
Version maxVersion = 0;
int64_t totalReleased = 0;
int64_t batchReleased = 0;
for (auto& it : self.grvProxyInfo) {
maxVersion = std::max(maxVersion, it.second.version);
totalReleased += it.second.totalTransactions;
batchReleased += it.second.batchTransactions;
}
self.version_transactions[maxVersion] =
Ratekeeper::VersionInfo(totalReleased, batchReleased, now());
loop {
auto secondEntry = self.version_transactions.begin();
++secondEntry;
if (secondEntry != self.version_transactions.end() &&
secondEntry->second.created < now() - (2 * SERVER_KNOBS->TARGET_BW_LAG) &&
(self.blobWorkerVersionHistory.empty() ||
secondEntry->first < self.blobWorkerVersionHistory.front().second)) {
self.version_transactions.erase(self.version_transactions.begin());
} else {
break;
}
}
}
self.updateRate(&self.normalLimits);
self.updateRate(&self.batchLimits);
@ -327,6 +448,8 @@ public:
p.totalTransactions = req.totalReleasedTransactions;
p.batchTransactions = req.batchReleasedTransactions;
p.version = req.version;
self.maxVersion = std::max(self.maxVersion, req.version);
p.lastUpdateTime = now();
reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size();
@ -430,6 +553,10 @@ Future<Void> Ratekeeper::monitorThrottlingChanges() {
return tagThrottler->monitorThrottlingChanges();
}
Future<Void> Ratekeeper::monitorBlobWorkers() {
return RatekeeperImpl::monitorBlobWorkers(this);
}
Future<Void> Ratekeeper::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return RatekeeperImpl::run(rkInterf, dbInfo);
}
@ -446,7 +573,8 @@ Ratekeeper::Ratekeeper(UID id, Database db)
SERVER_KNOBS->TARGET_BYTES_PER_TLOG,
SERVER_KNOBS->SPRING_BYTES_TLOG,
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS,
SERVER_KNOBS->TARGET_BW_LAG),
batchLimits(TransactionPriority::BATCH,
"Batch",
SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH,
@ -454,7 +582,9 @@ Ratekeeper::Ratekeeper(UID id, Database db)
SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH,
SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH,
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) {
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH,
SERVER_KNOBS->TARGET_BW_LAG_BATCH),
maxVersion(0), blobWorkerTime(now()) {
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
tagThrottler = std::make_unique<GlobalTagThrottler>(db, id);
} else {
@ -479,16 +609,11 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
double actualTps = smoothReleasedTransactions.smoothRate();
actualTpsMetric = (int64_t)actualTps;
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this
// value
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back
// up from this value
actualTps =
std::max(std::max(1.0, actualTps), smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT);
if (actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) {
actualTpsHistory.pop_front();
}
actualTpsHistory.push_back(actualTps);
limits->tpsLimit = std::numeric_limits<double>::infinity();
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
@ -509,7 +634,8 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
SERVER_KNOBS->RATEKEEPER_PRINT_LIMIT_REASON &&
(deterministicRandom()->random01() < SERVER_KNOBS->RATEKEEPER_LIMIT_REASON_SAMPLE_RATE);
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
// Look at each storage server's write queue and local rate, compute and store the desired rate
// ratio
for (auto i = storageQueueInfo.begin(); i != storageQueueInfo.end(); ++i) {
auto const& ss = i->value;
if (!ss.valid || !ss.acceptingRequests || (remoteDC.present() && ss.locality.dcId() == remoteDC))
@ -722,6 +848,119 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
break;
}
if (configuration.blobGranulesEnabled && SERVER_KNOBS->BW_THROTTLING_ENABLED) {
Version lastBWVer = 0;
auto lastIter = version_transactions.end();
if (!blobWorkerVersionHistory.empty()) {
lastBWVer = blobWorkerVersionHistory.back().second;
lastIter = version_transactions.lower_bound(lastBWVer);
if (lastIter != version_transactions.end()) {
blobWorkerTime = lastIter->second.created;
} else {
blobWorkerTime = std::max(blobWorkerTime,
now() - (maxVersion - lastBWVer) / (double)SERVER_KNOBS->VERSIONS_PER_SECOND);
}
}
double blobWorkerLag = now() - blobWorkerTime;
if (blobWorkerLag > limits->bwLagTarget / 2 && !blobWorkerVersionHistory.empty()) {
double elapsed = blobWorkerVersionHistory.back().first - blobWorkerVersionHistory.front().first;
Version firstBWVer = blobWorkerVersionHistory.front().second;
ASSERT(lastBWVer >= firstBWVer);
if (elapsed > SERVER_KNOBS->BW_ESTIMATION_INTERVAL / 2) {
auto firstIter = version_transactions.upper_bound(firstBWVer);
if (lastIter != version_transactions.end() && firstIter != version_transactions.begin()) {
--firstIter;
double targetRateRatio;
if (blobWorkerLag > 3 * limits->bwLagTarget) {
targetRateRatio = 0;
} else if (blobWorkerLag > limits->bwLagTarget) {
targetRateRatio = SERVER_KNOBS->BW_LAG_DECREASE_AMOUNT;
} else {
targetRateRatio = SERVER_KNOBS->BW_LAG_INCREASE_AMOUNT;
}
int64_t totalTransactions =
lastIter->second.totalTransactions - firstIter->second.totalTransactions;
int64_t batchTransactions =
lastIter->second.batchTransactions - firstIter->second.batchTransactions;
int64_t normalTransactions = totalTransactions - batchTransactions;
double bwTPS;
if (limits->bwLagTarget == SERVER_KNOBS->TARGET_BW_LAG) {
bwTPS = targetRateRatio * (totalTransactions) / elapsed;
} else {
bwTPS = std::max(0.0, ((targetRateRatio * (totalTransactions)) - normalTransactions) / elapsed);
}
if (bwTPS < limits->tpsLimit) {
if (printRateKeepLimitReasonDetails) {
TraceEvent("RatekeeperLimitReasonDetails")
.detail("Reason", limitReason_t::blob_worker_lag)
.detail("BWLag", blobWorkerLag)
.detail("BWRate", bwTPS)
.detail("Ratio", targetRateRatio)
.detail("Released", totalTransactions)
.detail("Elapsed", elapsed);
}
limits->tpsLimit = bwTPS;
limitReason = limitReason_t::blob_worker_lag;
}
} else if (blobWorkerLag > limits->bwLagTarget) {
double maxTps = 0;
for (int i = 0; i < actualTpsHistory.size(); i++) {
maxTps = std::max(maxTps, actualTpsHistory[i]);
}
double bwProgress =
std::min(elapsed, (lastBWVer - firstBWVer) / (double)SERVER_KNOBS->VERSIONS_PER_SECOND);
double bwTPS = maxTps * bwProgress / elapsed;
if (blobWorkerLag > 3 * limits->bwLagTarget) {
limits->tpsLimit = 0.0;
if (printRateKeepLimitReasonDetails) {
TraceEvent("RatekeeperLimitReasonDetails")
.detail("Reason", limitReason_t::blob_worker_missing)
.detail("LastValid", lastIter != version_transactions.end())
.detail("FirstValid", firstIter != version_transactions.begin());
}
limitReason = limitReason_t::blob_worker_missing;
} else if (bwTPS < limits->tpsLimit) {
if (printRateKeepLimitReasonDetails) {
TraceEvent("RatekeeperLimitReasonDetails")
.detail("Reason", limitReason_t::blob_worker_lag)
.detail("BWLag", blobWorkerLag)
.detail("BWRate", bwTPS)
.detail("MaxTPS", maxTps)
.detail("Progress", bwProgress)
.detail("Elapsed", elapsed);
}
limits->tpsLimit = bwTPS;
limitReason = limitReason_t::blob_worker_lag;
}
}
} else if (blobWorkerLag > 3 * limits->bwLagTarget) {
limits->tpsLimit = 0.0;
if (printRateKeepLimitReasonDetails) {
TraceEvent("RatekeeperLimitReasonDetails")
.detail("Reason", limitReason_t::blob_worker_missing)
.detail("Elapsed", elapsed)
.detail("LastVer", lastBWVer)
.detail("FirstVer", firstBWVer);
;
}
limitReason = limitReason_t::blob_worker_missing;
}
} else if (blobWorkerLag > 3 * limits->bwLagTarget) {
limits->tpsLimit = 0.0;
if (printRateKeepLimitReasonDetails) {
TraceEvent("RatekeeperLimitReasonDetails")
.detail("Reason", limitReason_t::blob_worker_missing)
.detail("BWLag", blobWorkerLag)
.detail("HistorySize", blobWorkerVersionHistory.size());
}
limitReason = limitReason_t::blob_worker_missing;
}
} else {
blobWorkerTime = now();
}
healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
healthMetrics.limitingStorageQueue = limitingStorageQueueStorageServer;
healthMetrics.worstStorageDurabilityLag = worstDurabilityLag;
@ -756,7 +995,8 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
}
if (minSSVer != std::numeric_limits<Version>::max() && maxTLVer != std::numeric_limits<Version>::min()) {
// writeToReadLatencyLimit: 0 = infinite speed; 1 = TL durable speed ; 2 = half TL durable speed
// writeToReadLatencyLimit: 0 = infinite speed; 1 = TL durable speed ; 2 = half TL durable
// speed
writeToReadLatencyLimit =
((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
@ -1066,8 +1306,8 @@ TLogQueueInfo::TLogQueueInfo(UID id)
: valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from
// storageQueueInfO)
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied
// from storageQueueInfO)
lastReply.instanceID = -1;
}
@ -1098,14 +1338,16 @@ RatekeeperLimits::RatekeeperLimits(TransactionPriority priority,
int64_t logTargetBytes,
int64_t logSpringBytes,
double maxVersionDifference,
int64_t durabilityLagTargetVersions)
int64_t durabilityLagTargetVersions,
double bwLagTarget)
: tpsLimit(std::numeric_limits<double>::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(
durabilityLagTargetVersions +
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
durabilityLagTargetVersions(durabilityLagTargetVersions +
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions
// are expected to not
// be durable on the storage servers
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), priority(priority),
context(context), rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), bwLagTarget(bwLagTarget),
priority(priority), context(context),
rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}

View File

@ -41,7 +41,9 @@ Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::vector<StorageServerInterface>> getStorageServers(Database const& cx,
bool const& use_system_priority = false);
Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database const& cx, bool const& use_system_priority = false);
Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database const& cx,
bool const& use_system_priority = false,
Version* const& grv = nullptr);
Future<std::vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);

View File

@ -46,6 +46,8 @@ enum limitReason_t {
log_server_min_free_space_ratio,
storage_server_durability_lag, // 10
storage_server_list_fetch_failed,
blob_worker_lag,
blob_worker_missing,
limitReason_t_end
};
@ -111,6 +113,8 @@ struct RatekeeperLimits {
int64_t lastDurabilityLag;
double durabilityLagLimit;
double bwLagTarget;
TransactionPriority priority;
std::string context;
@ -123,7 +127,8 @@ struct RatekeeperLimits {
int64_t logTargetBytes,
int64_t logSpringBytes,
double maxVersionDifference,
int64_t durabilityLagTargetVersions);
int64_t durabilityLagTargetVersions,
double bwLagTarget);
};
class Ratekeeper {
@ -137,6 +142,18 @@ class Ratekeeper {
double lastUpdateTime{ 0.0 };
double lastTagPushTime{ 0.0 };
Version version{ 0 };
};
struct VersionInfo {
int64_t totalTransactions;
int64_t batchTransactions;
double created;
VersionInfo(int64_t totalTransactions, int64_t batchTransactions, double created)
: totalTransactions(totalTransactions), batchTransactions(batchTransactions), created(created) {}
VersionInfo() : totalTransactions(0), batchTransactions(0), created(0.0) {}
};
UID id;
@ -165,6 +182,10 @@ class Ratekeeper {
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
Version maxVersion;
double blobWorkerTime;
std::map<Version, Ratekeeper::VersionInfo> version_transactions;
Deque<std::pair<double, Version>> blobWorkerVersionHistory;
Optional<Key> remoteDC;
Ratekeeper(UID id, Database db);
@ -182,6 +203,7 @@ class Ratekeeper {
void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason);
void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
Future<Void> monitorThrottlingChanges();
Future<Void> monitorBlobWorkers();
public:
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);

View File

@ -108,6 +108,7 @@ struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
Version version;
TransactionTagMap<uint64_t> throttledTagCounts;
bool detailed;
@ -117,16 +118,23 @@ struct GetRateInfoRequest {
GetRateInfoRequest(UID const& requesterID,
int64_t totalReleasedTransactions,
int64_t batchReleasedTransactions,
Version version,
TransactionTagMap<uint64_t> throttledTagCounts,
bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions),
batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts),
batchReleasedTransactions(batchReleasedTransactions), version(version), throttledTagCounts(throttledTagCounts),
detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(
ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply);
serializer(ar,
requesterID,
totalReleasedTransactions,
batchReleasedTransactions,
version,
throttledTagCounts,
detailed,
reply);
}
};

View File

@ -2354,6 +2354,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.granuleAssignmentsRequest);
DUMPTOKEN(recruited.granuleStatusStreamRequest);
DUMPTOKEN(recruited.haltBlobWorker);
DUMPTOKEN(recruited.minBlobVersionRequest);
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady = req.reply;
Future<Void> bw = blobWorker(recruited, blobWorkerReady, dbInfo);