the blob worker uses affinity when assigning ranges on startup or after a failure

This commit is contained in:
Evan Tschannen 2023-02-14 11:16:59 -08:00
parent a1576a890c
commit c0597cc614
5 changed files with 55 additions and 2 deletions

View File

@ -1046,6 +1046,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD, 0.9 );
init( BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY, 30.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY = deterministicRandom()->randomInt(0, 10) - 1;
init( BLOB_WORKER_STORE_TYPE, 3 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 3;
init( BLOB_WORKER_REJOIN_TIME, 10.0 ); if ( randomize && BUGGIFY ) BLOB_WORKER_STORE_TYPE = 10.0;
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );

View File

@ -1023,6 +1023,7 @@ public:
double BLOB_WORKER_REJECT_WHEN_FULL_THRESHOLD;
double BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY;
int BLOB_WORKER_STORE_TYPE;
double BLOB_WORKER_REJOIN_TIME;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX;

View File

@ -363,6 +363,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
std::unordered_map<UID, BlobWorkerInfo> workerStats; // mapping between workerID -> workerStats
std::unordered_set<NetworkAddress> workerAddresses;
std::unordered_set<UID> deadWorkers;
std::unordered_map<UID, UID> workerAffinities;
KeyRangeMap<UID> workerAssignments;
KeyRangeActorMap assignsInProgress;
KeyRangeMap<BoundaryEvaluation> boundaryEvaluations;
@ -2734,6 +2735,15 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
rangesToMove.push_back(it.range());
}
}
Optional<UID> successor = bwId;
while (bmData->workerAffinities.count(successor.get())) {
successor = bmData->workerAffinities[successor.get()];
}
if (successor.get() == bwId) {
successor = Optional<UID>();
}
for (auto& it : rangesToMove) {
// Send revoke request
RangeAssignment raRevoke;
@ -2745,7 +2755,7 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
// Add range back into the stream of ranges to be assigned
RangeAssignment raAssign;
raAssign.isAssign = true;
raAssign.worker = Optional<UID>();
raAssign.worker = successor;
raAssign.keyRange = it;
raAssign.assign = RangeAssignmentData(); // not a continue
handleRangeAssign(bmData, raAssign);
@ -3059,6 +3069,7 @@ ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWork
choose {
when(wait(waitFailure)) {
wait(delay(SERVER_KNOBS->BLOB_WORKER_REJOIN_TIME));
if (BM_DEBUG) {
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf.id().toString());
}
@ -3113,7 +3124,12 @@ ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promis
// Get list of last known blob workers
// note: the list will include every blob worker that the old manager knew about,
// but it might also contain blob workers that died while the new manager was being recruited
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
state std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
std::vector<std::pair<UID, UID>> blobWorkerAffinities = wait(getBlobWorkerAffinity(bmData->db));
bmData->workerAffinities.clear();
for (auto& it : blobWorkerAffinities) {
bmData->workerAffinities[it.second] = it.first;
}
// add all blob workers to this new blob manager's records and start monitoring it
bool foundAnyNew = false;
for (auto& worker : blobWorkers) {
@ -3755,6 +3771,9 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
// if worker id is already set to a known worker that replied with it in the mapping, range is already assigned
// there. If not, need to explicitly assign it to someone
if (workerId == UID() || epoch == 0 || !endingWorkers.count(workerId)) {
while (bmData->workerAffinities.count(workerId)) {
workerId = bmData->workerAffinities[workerId];
}
// prevent racing status updates from old owner from causing issues until this request gets sent out
// properly
bmData->boundaryEvaluations.insert(

View File

@ -264,6 +264,35 @@ ACTOR Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database cx,
}
}
ACTOR Future<std::vector<std::pair<UID, UID>>> getBlobWorkerAffinity(Database cx,
bool use_system_priority = false,
Version* grv = nullptr) {
state Transaction tr(cx);
loop {
if (use_system_priority) {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
RangeResult blobWorkerAffinity = wait(tr.getRange(blobWorkerAffinityKeys, CLIENT_KNOBS->TOO_MANY));
std::vector<std::pair<UID, UID>> affinities;
affinities.reserve(blobWorkerAffinity.size());
for (int i = 0; i < blobWorkerAffinity.size(); i++) {
affinities.push_back(std::make_pair(decodeBlobWorkerAffinityKey(blobWorkerAffinity[i].key),
decodeBlobWorkerAffinityValue(blobWorkerAffinity[i].value)));
}
if (grv) {
*grv = tr.getReadVersion().get();
}
return affinities;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<std::vector<StorageServerInterface>> getStorageServers(Database cx, bool use_system_priority = false) {
state Transaction tr(cx);
loop {

View File

@ -44,6 +44,9 @@ Future<std::vector<StorageServerInterface>> getStorageServers(Database const& cx
Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database const& cx,
bool const& use_system_priority = false,
Version* const& grv = nullptr);
Future<std::vector<std::pair<UID, UID>>> getBlobWorkerAffinity(Database const& cx,
bool const& use_system_priority = false,
Version* const& grv = nullptr);
Future<std::vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);