Throttle the cluster if the blob manager cannot assign ranges (#7900)
* Throttle the cluster if the blob manager cannot assign ranges * fixed a number of different bugs which caused ratekeeper to throttle to zero because of blob worker lag * fix: do not mark an assignment as block if it is cancelled * remove asserts to merge bug fixes * fix formatting * restored old control flow to storage updater * storage updater did not throw errors * disable buggify to see if it fixes CI
This commit is contained in:
parent
3a53ec3115
commit
493771b6a8
|
@ -1804,6 +1804,9 @@ DatabaseContext::~DatabaseContext() {
|
|||
for (auto& it : notAtLatestChangeFeeds) {
|
||||
it.second->context = nullptr;
|
||||
}
|
||||
for (auto& it : changeFeedUpdaters) {
|
||||
it.second->context = nullptr;
|
||||
}
|
||||
|
||||
TraceEvent("DatabaseContextDestructed", dbId).backtrace();
|
||||
}
|
||||
|
@ -8711,32 +8714,22 @@ void DatabaseContext::setSharedState(DatabaseSharedState* p) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> storageFeedVersionUpdater(StorageServerInterface interf, ChangeFeedStorageData* self) {
|
||||
state Promise<Void> destroyed = self->destroyed;
|
||||
loop {
|
||||
if (destroyed.isSet()) {
|
||||
return Void();
|
||||
}
|
||||
if (self->version.get() < self->desired.get()) {
|
||||
wait(delay(CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME) || self->version.whenAtLeast(self->desired.get()));
|
||||
if (destroyed.isSet()) {
|
||||
return Void();
|
||||
}
|
||||
if (self->version.get() < self->desired.get()) {
|
||||
try {
|
||||
ChangeFeedVersionUpdateReply rep = wait(brokenPromiseToNever(
|
||||
interf.changeFeedVersionUpdate.getReply(ChangeFeedVersionUpdateRequest(self->desired.get()))));
|
||||
|
||||
if (rep.version > self->version.get()) {
|
||||
self->version.set(rep.version);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_server_overloaded) {
|
||||
if (FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY > CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME) {
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY -
|
||||
CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME));
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
if (e.code() != error_code_server_overloaded) {
|
||||
throw;
|
||||
}
|
||||
if (FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY > CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME) {
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY - CLIENT_KNOBS->CHANGE_FEED_EMPTY_BATCH_TIME));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8755,16 +8748,19 @@ Reference<ChangeFeedStorageData> DatabaseContext::getStorageData(StorageServerIn
|
|||
newStorageUpdater->id = interf.id();
|
||||
newStorageUpdater->interfToken = token;
|
||||
newStorageUpdater->updater = storageFeedVersionUpdater(interf, newStorageUpdater.getPtr());
|
||||
changeFeedUpdaters[token] = newStorageUpdater;
|
||||
newStorageUpdater->context = this;
|
||||
changeFeedUpdaters[token] = newStorageUpdater.getPtr();
|
||||
return newStorageUpdater;
|
||||
}
|
||||
return it->second;
|
||||
return Reference<ChangeFeedStorageData>::addRef(it->second);
|
||||
}
|
||||
|
||||
Version DatabaseContext::getMinimumChangeFeedVersion() {
|
||||
Version minVersion = std::numeric_limits<Version>::max();
|
||||
for (auto& it : changeFeedUpdaters) {
|
||||
minVersion = std::min(minVersion, it.second->version.get());
|
||||
if (it.second->version.get() > 0) {
|
||||
minVersion = std::min(minVersion, it.second->version.get());
|
||||
}
|
||||
}
|
||||
for (auto& it : notAtLatestChangeFeeds) {
|
||||
if (it.second->getVersion() > 0) {
|
||||
|
@ -8782,6 +8778,12 @@ void DatabaseContext::setDesiredChangeFeedVersion(Version v) {
|
|||
}
|
||||
}
|
||||
|
||||
ChangeFeedStorageData::~ChangeFeedStorageData() {
|
||||
if (context) {
|
||||
context->changeFeedUpdaters.erase(interfToken);
|
||||
}
|
||||
}
|
||||
|
||||
ChangeFeedData::ChangeFeedData(DatabaseContext* context)
|
||||
: dbgid(deterministicRandom()->randomUniqueID()), context(context), notAtLatest(1) {
|
||||
if (context) {
|
||||
|
@ -9178,11 +9180,6 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
|
|||
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
|
||||
}
|
||||
|
||||
for (auto& it : results->storageData) {
|
||||
if (it->debugGetReferenceCount() == 2) {
|
||||
db->changeFeedUpdaters.erase(it->interfToken);
|
||||
}
|
||||
}
|
||||
results->maxSeenVersion = invalidVersion;
|
||||
results->storageData.clear();
|
||||
Promise<Void> refresh = results->refresh;
|
||||
|
@ -9236,6 +9233,8 @@ ACTOR Future<KeyRange> getChangeFeedRange(Reference<DatabaseContext> db, Databas
|
|||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Version readVer = wait(tr.getReadVersion());
|
||||
if (readVer < begin) {
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
|
@ -9387,11 +9386,6 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
|
|||
|
||||
results->streams.clear();
|
||||
|
||||
for (auto& it : results->storageData) {
|
||||
if (it->debugGetReferenceCount() == 2) {
|
||||
db->changeFeedUpdaters.erase(it->interfToken);
|
||||
}
|
||||
}
|
||||
results->streams.push_back(interf.changeFeedStream.getReplyStream(req));
|
||||
|
||||
results->maxSeenVersion = invalidVersion;
|
||||
|
@ -9511,11 +9505,6 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
|
|||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled || e.code() == error_code_change_feed_popped) {
|
||||
for (auto& it : results->storageData) {
|
||||
if (it->debugGetReferenceCount() == 2) {
|
||||
db->changeFeedUpdaters.erase(it->interfToken);
|
||||
}
|
||||
}
|
||||
results->streams.clear();
|
||||
results->storageData.clear();
|
||||
if (e.code() == error_code_change_feed_popped) {
|
||||
|
@ -9550,11 +9539,6 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
|
|||
} else {
|
||||
results->mutations.sendError(e);
|
||||
results->refresh.sendError(change_feed_cancelled());
|
||||
for (auto& it : results->storageData) {
|
||||
if (it->debugGetReferenceCount() == 2) {
|
||||
db->changeFeedUpdaters.erase(it->interfToken);
|
||||
}
|
||||
}
|
||||
results->streams.clear();
|
||||
results->storageData.clear();
|
||||
return Void();
|
||||
|
@ -9682,6 +9666,8 @@ ACTOR static Future<Void> popChangeFeedBackup(Database cx, Key rangeID, Version
|
|||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
|
||||
Optional<Value> val = wait(tr.get(rangeIDKey));
|
||||
if (val.present()) {
|
||||
|
@ -9801,6 +9787,7 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
if (tenant.present() && !loadedTenantPrefix) {
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin));
|
||||
|
|
|
@ -673,15 +673,18 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( DURABILITY_LAG_INCREASE_RATE, 1.001 );
|
||||
init( STORAGE_SERVER_LIST_FETCH_TIMEOUT, 20.0 );
|
||||
init( BW_THROTTLING_ENABLED, true );
|
||||
init( TARGET_BW_LAG, 50.0 );
|
||||
init( TARGET_BW_LAG_BATCH, 20.0 );
|
||||
init( TARGET_BW_LAG_UPDATE, 9.0 );
|
||||
|
||||
bool buggifySmallBWLag = false; //randomize && BUGGIFY;
|
||||
init( TARGET_BW_LAG, 50.0 ); if(buggifySmallBWLag) TARGET_BW_LAG = 10.0;
|
||||
init( TARGET_BW_LAG_BATCH, 20.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0;
|
||||
init( TARGET_BW_LAG_UPDATE, 9.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_UPDATE = 1.0;
|
||||
init( MIN_BW_HISTORY, 10 );
|
||||
init( BW_ESTIMATION_INTERVAL, 10.0 );
|
||||
init( BW_ESTIMATION_INTERVAL, 10.0 ); if(buggifySmallBWLag) BW_ESTIMATION_INTERVAL = 2.0;
|
||||
init( BW_LAG_INCREASE_AMOUNT, 1.1 );
|
||||
init( BW_LAG_DECREASE_AMOUNT, 0.9 );
|
||||
init( BW_FETCH_WORKERS_INTERVAL, 5.0 );
|
||||
init( BW_RW_LOGGING_INTERVAL, 5.0 );
|
||||
init( BW_MAX_BLOCKED_INTERVAL, 10.0 ); if(buggifySmallBWLag) BW_MAX_BLOCKED_INTERVAL = 2.0;
|
||||
|
||||
init( MAX_AUTO_THROTTLED_TRANSACTION_TAGS, 5 ); if(randomize && BUGGIFY) MAX_AUTO_THROTTLED_TRANSACTION_TAGS = 1;
|
||||
init( MAX_MANUAL_THROTTLED_TRANSACTION_TAGS, 40 ); if(randomize && BUGGIFY) MAX_MANUAL_THROTTLED_TRANSACTION_TAGS = 1;
|
||||
|
|
|
@ -168,10 +168,10 @@ struct ChangeFeedStorageData : ReferenceCounted<ChangeFeedStorageData> {
|
|||
Future<Void> updater;
|
||||
NotifiedVersion version;
|
||||
NotifiedVersion desired;
|
||||
Promise<Void> destroyed;
|
||||
UID interfToken;
|
||||
DatabaseContext* context;
|
||||
|
||||
~ChangeFeedStorageData() { destroyed.send(Void()); }
|
||||
~ChangeFeedStorageData();
|
||||
};
|
||||
|
||||
struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
|
||||
|
@ -477,7 +477,7 @@ public:
|
|||
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
|
||||
// map from changeFeedId -> changeFeedRange
|
||||
std::unordered_map<Key, KeyRange> changeFeedCache;
|
||||
std::unordered_map<UID, Reference<ChangeFeedStorageData>> changeFeedUpdaters;
|
||||
std::unordered_map<UID, ChangeFeedStorageData*> changeFeedUpdaters;
|
||||
std::map<UID, ChangeFeedData*> notAtLatestChangeFeeds;
|
||||
|
||||
Reference<ChangeFeedStorageData> getStorageData(StorageServerInterface interf);
|
||||
|
|
|
@ -640,6 +640,7 @@ public:
|
|||
double BW_LAG_DECREASE_AMOUNT;
|
||||
double BW_FETCH_WORKERS_INTERVAL;
|
||||
double BW_RW_LOGGING_INTERVAL;
|
||||
double BW_MAX_BLOCKED_INTERVAL;
|
||||
|
||||
// disk snapshot
|
||||
int64_t MAX_FORKED_PROCESS_OUTPUT;
|
||||
|
|
|
@ -925,14 +925,19 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
|
|||
if (e.code() == error_code_blob_worker_full) {
|
||||
CODE_PROBE(true, "blob worker too full");
|
||||
ASSERT(assignment.isAssign);
|
||||
if (assignment.previousFailure.present() &&
|
||||
assignment.previousFailure.get().second.code() == error_code_blob_worker_full) {
|
||||
// if previous assignment also failed due to blob_worker_full, multiple workers are full, so wait even
|
||||
// longer
|
||||
CODE_PROBE(true, "multiple blob workers too full");
|
||||
wait(delayJittered(10.0));
|
||||
} else {
|
||||
wait(delayJittered(1.0)); // wait a bit before retrying
|
||||
try {
|
||||
if (assignment.previousFailure.present() &&
|
||||
assignment.previousFailure.get().second.code() == error_code_blob_worker_full) {
|
||||
// if previous assignment also failed due to blob_worker_full, multiple workers are full, so wait
|
||||
// even longer
|
||||
CODE_PROBE(true, "multiple blob workers too full");
|
||||
wait(delayJittered(10.0));
|
||||
} else {
|
||||
wait(delayJittered(1.0)); // wait a bit before retrying
|
||||
}
|
||||
} catch (Error& e) {
|
||||
--bmData->stats.blockedAssignments;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5035,6 +5040,15 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
|
|||
// Simulation validation that multiple blob managers aren't started with the same epoch
|
||||
static std::map<int64_t, UID> managerEpochsSeen;
|
||||
|
||||
ACTOR Future<Void> checkBlobManagerEpoch(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int64_t epoch, UID dbgid) {
|
||||
loop {
|
||||
if (dbInfo->get().blobManager.present() && dbInfo->get().blobManager.get().epoch > epoch) {
|
||||
throw worker_removed();
|
||||
}
|
||||
wait(dbInfo->onChange());
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
|
||||
int64_t epoch) {
|
||||
|
@ -5050,7 +5064,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
managerEpochsSeen[epoch] = bmInterf.id();
|
||||
}
|
||||
state Reference<BlobManagerData> self =
|
||||
makeReference<BlobManagerData>(deterministicRandom()->randomUniqueID(),
|
||||
makeReference<BlobManagerData>(bmInterf.id(),
|
||||
dbInfo,
|
||||
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
|
||||
bmInterf.locality.dcId(),
|
||||
|
@ -5071,10 +5085,11 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });
|
||||
|
||||
self->addActor.send(blobWorkerRecruiter(self, recruitBlobWorker));
|
||||
self->addActor.send(checkBlobManagerEpoch(dbInfo, epoch, bmInterf.id()));
|
||||
|
||||
// we need to recover the old blob manager's state (e.g. granule assignments) before
|
||||
// before the new blob manager does anything
|
||||
wait(recoverBlobManager(self));
|
||||
wait(recoverBlobManager(self) || collection);
|
||||
|
||||
self->addActor.send(doLockChecks(self));
|
||||
self->addActor.send(monitorClientRanges(self));
|
||||
|
@ -5105,14 +5120,16 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
break;
|
||||
}
|
||||
when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) {
|
||||
wait(haltBlobGranules(self));
|
||||
wait(haltBlobGranules(self) || collection);
|
||||
req.reply.send(Void());
|
||||
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("Epoch", epoch).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when(BlobManagerExclusionSafetyCheckRequest exclCheckReq =
|
||||
waitNext(bmInterf.blobManagerExclCheckReq.getFuture())) {
|
||||
blobManagerExclusionSafetyCheck(self, exclCheckReq);
|
||||
when(BlobManagerExclusionSafetyCheckRequest req = waitNext(bmInterf.blobManagerExclCheckReq.getFuture())) {
|
||||
blobManagerExclusionSafetyCheck(self, req);
|
||||
}
|
||||
when(BlobManagerBlockedRequest req = waitNext(bmInterf.blobManagerBlockedReq.getFuture())) {
|
||||
req.reply.send(BlobManagerBlockedReply(self->stats.blockedAssignments));
|
||||
}
|
||||
when(wait(collection)) {
|
||||
TraceEvent(SevError, "BlobManagerActorCollectionError");
|
||||
|
@ -5123,6 +5140,9 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
} catch (Error& err) {
|
||||
TraceEvent("BlobManagerDied", bmInterf.id()).errorUnsuppressed(err);
|
||||
}
|
||||
// prevent a reference counting cycle
|
||||
self->assignsInProgress = KeyRangeActorMap();
|
||||
self->boundaryEvaluations.clear();
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ClientKnobs.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/Ratekeeper.h"
|
||||
|
@ -246,7 +247,7 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorBlobWorkers(Ratekeeper* self) {
|
||||
ACTOR static Future<Void> monitorBlobWorkers(Ratekeeper* self, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
state std::vector<BlobWorkerInterface> blobWorkers;
|
||||
state int workerFetchCount = 0;
|
||||
state double lastStartTime = 0;
|
||||
|
@ -276,6 +277,13 @@ public:
|
|||
startTime = now();
|
||||
|
||||
if (blobWorkers.size() > 0) {
|
||||
state Future<Optional<BlobManagerBlockedReply>> blockedAssignments;
|
||||
if (dbInfo->get().blobManager.present()) {
|
||||
blockedAssignments =
|
||||
timeout(brokenPromiseToNever(dbInfo->get().blobManager.get().blobManagerBlockedReq.getReply(
|
||||
BlobManagerBlockedRequest())),
|
||||
SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
|
||||
}
|
||||
state std::vector<Future<Optional<MinBlobVersionReply>>> aliveVersions;
|
||||
aliveVersions.reserve(blobWorkers.size());
|
||||
for (auto& it : blobWorkers) {
|
||||
|
@ -284,6 +292,12 @@ public:
|
|||
aliveVersions.push_back(timeout(brokenPromiseToNever(it.minBlobVersionRequest.getReply(req)),
|
||||
SERVER_KNOBS->BLOB_WORKER_TIMEOUT));
|
||||
}
|
||||
if (blockedAssignments.isValid()) {
|
||||
wait(success(blockedAssignments));
|
||||
if (blockedAssignments.get().present() && blockedAssignments.get().get().blockedAssignments == 0) {
|
||||
self->unblockedAssignmentTime = now();
|
||||
}
|
||||
}
|
||||
wait(waitForAll(aliveVersions));
|
||||
Version minVer = grv;
|
||||
blobWorkerDead = false;
|
||||
|
@ -297,10 +311,12 @@ public:
|
|||
} else {
|
||||
blobWorkerDead = true;
|
||||
minVer = 0;
|
||||
minIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (minVer > 0 && blobWorkers.size() > 0) {
|
||||
if (minVer > 0 && blobWorkers.size() > 0 &&
|
||||
now() - self->unblockedAssignmentTime < SERVER_KNOBS->BW_MAX_BLOCKED_INTERVAL) {
|
||||
while (!self->blobWorkerVersionHistory.empty() &&
|
||||
minVer < self->blobWorkerVersionHistory.back().second) {
|
||||
self->blobWorkerVersionHistory.pop_back();
|
||||
|
@ -316,7 +332,8 @@ public:
|
|||
lastLoggedTime = now();
|
||||
TraceEvent("RkMinBlobWorkerVersion")
|
||||
.detail("BWVersion", minVer)
|
||||
.detail("MinId", blobWorkers[minIdx].id());
|
||||
.detail("MaxVer", self->maxVersion)
|
||||
.detail("MinId", blobWorkers.size() > 0 ? blobWorkers[minIdx].id() : UID());
|
||||
}
|
||||
}
|
||||
wait(blobWorkerDelay);
|
||||
|
@ -344,7 +361,7 @@ public:
|
|||
|
||||
self.addActor.send(self.monitorThrottlingChanges());
|
||||
if (SERVER_KNOBS->BW_THROTTLING_ENABLED) {
|
||||
self.addActor.send(self.monitorBlobWorkers());
|
||||
self.addActor.send(self.monitorBlobWorkers(dbInfo));
|
||||
}
|
||||
self.addActor.send(self.refreshStorageServerCommitCosts());
|
||||
|
||||
|
@ -375,6 +392,12 @@ public:
|
|||
|
||||
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
|
||||
|
||||
state bool recovering = dbInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS;
|
||||
state Version recoveryVersion = std::numeric_limits<Version>::max();
|
||||
if (recovering) {
|
||||
self.version_recovery[recoveryVersion] = std::make_pair(now(), Optional<double>());
|
||||
}
|
||||
|
||||
try {
|
||||
state bool lastLimited = false;
|
||||
loop choose {
|
||||
|
@ -414,6 +437,9 @@ public:
|
|||
}
|
||||
}
|
||||
}
|
||||
while (self.version_recovery.size() > CLIENT_KNOBS->MAX_GENERATIONS) {
|
||||
self.version_recovery.erase(self.version_recovery.begin());
|
||||
}
|
||||
|
||||
self.updateRate(&self.normalLimits);
|
||||
self.updateRate(&self.batchLimits);
|
||||
|
@ -450,6 +476,15 @@ public:
|
|||
p.batchTransactions = req.batchReleasedTransactions;
|
||||
p.version = req.version;
|
||||
self.maxVersion = std::max(self.maxVersion, req.version);
|
||||
|
||||
if (recoveryVersion == std::numeric_limits<Version>::max() &&
|
||||
self.version_recovery.count(recoveryVersion)) {
|
||||
recoveryVersion = self.maxVersion;
|
||||
self.version_recovery[recoveryVersion] =
|
||||
self.version_recovery[std::numeric_limits<Version>::max()];
|
||||
self.version_recovery.erase(std::numeric_limits<Version>::max());
|
||||
}
|
||||
|
||||
p.lastUpdateTime = now();
|
||||
|
||||
reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size();
|
||||
|
@ -496,6 +531,27 @@ public:
|
|||
}
|
||||
when(wait(err.getFuture())) {}
|
||||
when(wait(dbInfo->onChange())) {
|
||||
if (!recovering && dbInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
|
||||
recovering = true;
|
||||
recoveryVersion = self.maxVersion;
|
||||
if (recoveryVersion == 0) {
|
||||
recoveryVersion = std::numeric_limits<Version>::max();
|
||||
}
|
||||
if (self.version_recovery.count(recoveryVersion)) {
|
||||
auto& it = self.version_recovery[recoveryVersion];
|
||||
double existingEnd = it.second.present() ? it.second.get() : now();
|
||||
double existingDuration = existingEnd - it.first;
|
||||
self.version_recovery[recoveryVersion] =
|
||||
std::make_pair(now() - existingDuration, Optional<double>());
|
||||
} else {
|
||||
self.version_recovery[recoveryVersion] = std::make_pair(now(), Optional<double>());
|
||||
}
|
||||
}
|
||||
if (recovering && dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
|
||||
recovering = false;
|
||||
self.version_recovery[recoveryVersion].second = now();
|
||||
}
|
||||
|
||||
if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) {
|
||||
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
|
||||
tlogTrackers = std::vector<Future<Void>>();
|
||||
|
@ -553,8 +609,8 @@ Future<Void> Ratekeeper::monitorThrottlingChanges() {
|
|||
return tagThrottler->monitorThrottlingChanges();
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::monitorBlobWorkers() {
|
||||
return RatekeeperImpl::monitorBlobWorkers(this);
|
||||
Future<Void> Ratekeeper::monitorBlobWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
return RatekeeperImpl::monitorBlobWorkers(this, dbInfo);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
|
@ -584,7 +640,7 @@ Ratekeeper::Ratekeeper(UID id, Database db)
|
|||
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
|
||||
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH,
|
||||
SERVER_KNOBS->TARGET_BW_LAG_BATCH),
|
||||
maxVersion(0), blobWorkerTime(now()) {
|
||||
maxVersion(0), blobWorkerTime(now()), unblockedAssignmentTime(now()) {
|
||||
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
|
||||
tagThrottler = std::make_unique<GlobalTagThrottler>(db, id);
|
||||
} else {
|
||||
|
@ -861,7 +917,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
now() - (maxVersion - lastBWVer) / (double)SERVER_KNOBS->VERSIONS_PER_SECOND);
|
||||
}
|
||||
}
|
||||
double blobWorkerLag = now() - blobWorkerTime;
|
||||
double blobWorkerLag = (now() - blobWorkerTime) - getRecoveryDuration(lastBWVer);
|
||||
if (blobWorkerLag > limits->bwLagTarget / 2 && !blobWorkerVersionHistory.empty()) {
|
||||
double elapsed = blobWorkerVersionHistory.back().first - blobWorkerVersionHistory.front().first;
|
||||
Version firstBWVer = blobWorkerVersionHistory.front().second;
|
||||
|
@ -873,6 +929,8 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
double targetRateRatio;
|
||||
if (blobWorkerLag > 3 * limits->bwLagTarget) {
|
||||
targetRateRatio = 0;
|
||||
// ASSERT(!g_network->isSimulated() || limits->bwLagTarget != SERVER_KNOBS->TARGET_BW_LAG ||
|
||||
// now() < FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS + 50);
|
||||
} else if (blobWorkerLag > limits->bwLagTarget) {
|
||||
targetRateRatio = SERVER_KNOBS->BW_LAG_DECREASE_AMOUNT;
|
||||
} else {
|
||||
|
@ -898,7 +956,9 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
.detail("BWRate", bwTPS)
|
||||
.detail("Ratio", targetRateRatio)
|
||||
.detail("Released", totalTransactions)
|
||||
.detail("Elapsed", elapsed);
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("LastVer", lastBWVer)
|
||||
.detail("RecoveryDuration", getRecoveryDuration(lastBWVer));
|
||||
}
|
||||
limits->tpsLimit = bwTPS;
|
||||
limitReason = limitReason_t::blob_worker_lag;
|
||||
|
@ -918,9 +978,17 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
TraceEvent("RatekeeperLimitReasonDetails")
|
||||
.detail("Reason", limitReason_t::blob_worker_missing)
|
||||
.detail("LastValid", lastIter != version_transactions.end())
|
||||
.detail("FirstValid", firstIter != version_transactions.begin());
|
||||
.detail("FirstValid", firstIter != version_transactions.begin())
|
||||
.detail("FirstVersion",
|
||||
version_transactions.size() ? version_transactions.begin()->first : -1)
|
||||
.detail("FirstBWVer", firstBWVer)
|
||||
.detail("LastBWVer", lastBWVer)
|
||||
.detail("VerTransactions", version_transactions.size())
|
||||
.detail("RecoveryDuration", getRecoveryDuration(lastBWVer));
|
||||
}
|
||||
limitReason = limitReason_t::blob_worker_missing;
|
||||
// ASSERT(!g_network->isSimulated() || limits->bwLagTarget != SERVER_KNOBS->TARGET_BW_LAG ||
|
||||
// now() < FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS + 50);
|
||||
} else if (bwTPS < limits->tpsLimit) {
|
||||
if (printRateKeepLimitReasonDetails) {
|
||||
TraceEvent("RatekeeperLimitReasonDetails")
|
||||
|
@ -942,10 +1010,14 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
.detail("Reason", limitReason_t::blob_worker_missing)
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("LastVer", lastBWVer)
|
||||
.detail("FirstVer", firstBWVer);
|
||||
.detail("FirstVer", firstBWVer)
|
||||
.detail("BWLag", blobWorkerLag)
|
||||
.detail("RecoveryDuration", getRecoveryDuration(lastBWVer));
|
||||
;
|
||||
}
|
||||
limitReason = limitReason_t::blob_worker_missing;
|
||||
// ASSERT(!g_network->isSimulated() || limits->bwLagTarget != SERVER_KNOBS->TARGET_BW_LAG ||
|
||||
// now() < FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS + 50);
|
||||
}
|
||||
} else if (blobWorkerLag > 3 * limits->bwLagTarget) {
|
||||
limits->tpsLimit = 0.0;
|
||||
|
@ -953,12 +1025,16 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
TraceEvent("RatekeeperLimitReasonDetails")
|
||||
.detail("Reason", limitReason_t::blob_worker_missing)
|
||||
.detail("BWLag", blobWorkerLag)
|
||||
.detail("RecoveryDuration", getRecoveryDuration(lastBWVer))
|
||||
.detail("HistorySize", blobWorkerVersionHistory.size());
|
||||
}
|
||||
limitReason = limitReason_t::blob_worker_missing;
|
||||
// ASSERT(!g_network->isSimulated() || limits->bwLagTarget != SERVER_KNOBS->TARGET_BW_LAG ||
|
||||
// now() < FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS + 50);
|
||||
}
|
||||
} else {
|
||||
blobWorkerTime = now();
|
||||
unblockedAssignmentTime = now();
|
||||
}
|
||||
|
||||
healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
||||
|
|
|
@ -32,11 +32,13 @@ struct BlobManagerInterface {
|
|||
RequestStream<struct HaltBlobManagerRequest> haltBlobManager;
|
||||
RequestStream<struct HaltBlobGranulesRequest> haltBlobGranules;
|
||||
RequestStream<struct BlobManagerExclusionSafetyCheckRequest> blobManagerExclCheckReq;
|
||||
RequestStream<struct BlobManagerBlockedRequest> blobManagerBlockedReq;
|
||||
struct LocalityData locality;
|
||||
UID myId;
|
||||
int64_t epoch;
|
||||
|
||||
BlobManagerInterface() {}
|
||||
explicit BlobManagerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
|
||||
BlobManagerInterface() : epoch(0) {}
|
||||
BlobManagerInterface(const struct LocalityData& l, UID id, int64_t epoch) : locality(l), myId(id), epoch(epoch) {}
|
||||
|
||||
void initEndpoints() {}
|
||||
UID id() const { return myId; }
|
||||
|
@ -46,7 +48,15 @@ struct BlobManagerInterface {
|
|||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, waitFailure, haltBlobManager, haltBlobGranules, blobManagerExclCheckReq, locality, myId);
|
||||
serializer(ar,
|
||||
waitFailure,
|
||||
haltBlobManager,
|
||||
haltBlobGranules,
|
||||
blobManagerExclCheckReq,
|
||||
blobManagerBlockedReq,
|
||||
locality,
|
||||
myId,
|
||||
epoch);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -106,4 +116,29 @@ struct BlobManagerExclusionSafetyCheckRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct BlobManagerBlockedReply {
|
||||
constexpr static FileIdentifier file_identifier = 8078627;
|
||||
int64_t blockedAssignments;
|
||||
|
||||
BlobManagerBlockedReply() : blockedAssignments(0) {}
|
||||
explicit BlobManagerBlockedReply(int64_t blockedAssignments) : blockedAssignments(blockedAssignments) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, blockedAssignments);
|
||||
}
|
||||
};
|
||||
|
||||
struct BlobManagerBlockedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 1986387;
|
||||
ReplyPromise<BlobManagerBlockedReply> reply;
|
||||
|
||||
BlobManagerBlockedRequest() {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -184,10 +184,26 @@ class Ratekeeper {
|
|||
Deque<double> actualTpsHistory;
|
||||
Version maxVersion;
|
||||
double blobWorkerTime;
|
||||
double unblockedAssignmentTime;
|
||||
std::map<Version, Ratekeeper::VersionInfo> version_transactions;
|
||||
std::map<Version, std::pair<double, Optional<double>>> version_recovery;
|
||||
Deque<std::pair<double, Version>> blobWorkerVersionHistory;
|
||||
Optional<Key> remoteDC;
|
||||
|
||||
double getRecoveryDuration(Version ver) {
|
||||
auto it = version_recovery.lower_bound(ver);
|
||||
double recoveryDuration = 0;
|
||||
while (it != version_recovery.end()) {
|
||||
if (it->second.second.present()) {
|
||||
recoveryDuration += it->second.second.get() - it->second.first;
|
||||
} else {
|
||||
recoveryDuration += now() - it->second.first;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
return recoveryDuration;
|
||||
}
|
||||
|
||||
Ratekeeper(UID id, Database db);
|
||||
|
||||
Future<Void> configurationMonitor();
|
||||
|
@ -203,7 +219,7 @@ class Ratekeeper {
|
|||
void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason);
|
||||
void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
|
||||
Future<Void> monitorThrottlingChanges();
|
||||
Future<Void> monitorBlobWorkers();
|
||||
Future<Void> monitorBlobWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
||||
public:
|
||||
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
|
|
@ -2504,7 +2504,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
.detail("StreamUID", streamUID)
|
||||
.detail("Range", req.range)
|
||||
.detail("Begin", req.begin)
|
||||
.detail("End", req.end);
|
||||
.detail("End", req.end)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
if (data->version.get() < req.begin) {
|
||||
|
@ -2550,7 +2551,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
.detail("FetchStorageVersion", fetchStorageVersion)
|
||||
.detail("FetchVersion", feedInfo->fetchVersion)
|
||||
.detail("DurableFetchVersion", feedInfo->durableFetchVersion.get())
|
||||
.detail("DurableValidationVersion", durableValidationVersion);
|
||||
.detail("DurableValidationVersion", durableValidationVersion)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
if (req.end > emptyVersion + 1) {
|
||||
|
@ -2791,7 +2793,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
.detail("FirstVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.front().version)
|
||||
.detail("LastVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.back().version)
|
||||
.detail("Count", reply.mutations.size())
|
||||
.detail("GotAll", gotAll);
|
||||
.detail("GotAll", gotAll)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
if (DEBUG_CF_MISSING(req.rangeID, req.range, req.begin, reply.mutations.back().version) && !req.canReadPopped) {
|
||||
|
@ -2944,7 +2947,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
.detail("Range", req.range)
|
||||
.detail("Begin", req.begin)
|
||||
.detail("End", req.end)
|
||||
.detail("CanReadPopped", req.canReadPopped);
|
||||
.detail("CanReadPopped", req.canReadPopped)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
data->activeFeedQueries++;
|
||||
|
||||
|
@ -2965,7 +2969,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
.detail("Begin", req.begin)
|
||||
.detail("End", req.end)
|
||||
.detail("CanReadPopped", req.canReadPopped)
|
||||
.detail("Version", req.begin - 1);
|
||||
.detail("Version", req.begin - 1)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
loop {
|
||||
|
@ -2981,7 +2986,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
.detail("Begin", req.begin)
|
||||
.detail("End", req.end)
|
||||
.detail("CanReadPopped", req.canReadPopped)
|
||||
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion);
|
||||
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
removeUID = true;
|
||||
}
|
||||
|
@ -3001,7 +3007,8 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
.detail("Begin", req.begin)
|
||||
.detail("End", req.end)
|
||||
.detail("CanReadPopped", req.canReadPopped)
|
||||
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion);
|
||||
.detail("Version", blockedVersion.present() ? blockedVersion.get() : data->prevVersion)
|
||||
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
}
|
||||
std::pair<ChangeFeedStreamReply, bool> _feedReply = wait(feedReplyFuture);
|
||||
|
|
|
@ -2087,7 +2087,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) {
|
||||
LocalLineage _;
|
||||
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobManager;
|
||||
BlobManagerInterface recruited(locality, req.reqId);
|
||||
BlobManagerInterface recruited(locality, req.reqId, req.epoch);
|
||||
recruited.initEndpoints();
|
||||
|
||||
if (bmEpochAndInterf->get().present() && bmEpochAndInterf->get().get().first == req.epoch) {
|
||||
|
|
Loading…
Reference in New Issue