initial thoughts

This commit is contained in:
Suraj Gupta 2021-10-20 11:54:19 -04:00
parent 7d25dab96b
commit 99606482ea
3 changed files with 181 additions and 2 deletions

View File

@ -28,6 +28,7 @@
#include "fdbclient/SystemData.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/IRandom.h"
@ -715,7 +716,32 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
return Void();
}
void killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
ACTOR Future<Void> deregisterBlobWorker(BlobManagerData* bmData, const BlobWorkerInterface& interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
tr->clear(blobWorkerListKey);
wait(tr->commit());
if (BM_DEBUG) {
printf("Deregistered blob worker %s\n", interf.id().toString().c_str());
}
return Void();
} catch (Error& e) {
if (BM_DEBUG) {
printf("Deregistering blob worker %s got error %s\n", interf.id().toString().c_str(), e.name());
}
wait(tr->onError(e));
}
}
}
void killBlobWorker(BlobManagerData* bmData, const BlobWorkerInterface& bwInterf) {
UID bwId = bwInterf.id();
// Remove blob worker from stats map so that when we try to find a worker to takeover the range,
@ -756,6 +782,8 @@ void killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
}
bmData->addActor.send(
brokenPromiseToNever(bwInterf.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
wait(deregisterBlobWorker(bmData, bwInterf));
}
ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
@ -868,6 +896,7 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
}
}
} catch (Error& e) {
// will blob worker get cleaned up in this case?
if (e.code() == error_code_operation_cancelled) {
throw e;
}
@ -895,6 +924,126 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
return Void();
}
ACTOR Future<Void> ackExistingBlobWorkers(BlobManagerData* bmData) {
// get list of last known blob workers
// note: the list will include every blob worker that the old manager knew about
// but it might also contain blob workers that died while the new manager was being recruited
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
// add all blob workers to this new blob manager's records
for (auto worker : blobWorkers) {
bmData->workersById[worker.id()] = worker;
bmData->workerStats[worker.id()] = BlobWorkerStats();
// if the worker died when the new BM was being recruited, the monitor will fail and we
// will clean up the dead blob worker (and recruit new ones in place)
bmData->addActor.send(monitorBlobWorker(bmData, worker));
}
// TODO: anyway we can guarantee the monitor ran so that we have a complete set of alive BWs?
// At this point, bmData->workersById is a complete list of blob workers
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
// TODO: if blob manager dies, and while another is being recruited, a blob worker dies, we need to handle sending
// the ranges for the dead blob worker to new workers. For every range persisted in the DB, send an assign request
// to the worker that it was intended for. If the worker indeed had it, its a noop. If the worker didn't have it,
// it'll persist it. If the worker is dead, i.e. it died while a new BM was being recruited, (easy check: just check
// if that worker is still a worker that we know about via getBlobWorkers), let other workers take over its ranges.
//
// TODO: confirm the below are solved now with the new way of checking BWs liveness.
// Problem: when a blob worker dies while a blob manager is being recruited, we wouldn't recruit new BW to replace
// that one, even though we should have. Easy fix: when going through the list of persisted blob workers
// assignments, if any of them are dead, just recruit a new one. Problem: if the blob worker that died didn't have
// any assignments, it wouldn't be in this list, and so we wouldn't rerecruit for it.
// 1. Get existing assignments using blobGranuleMappingKeys.begin
// 2. Get split intentions using blobGranuleSplitKeys
// 3. Start sending assign requests to blob workers
//
//
// Cases to consider:
// - Blob Manager revokes and re-assigns a range but only the revoke went through before BM died. Then,
// granuleMappingKeys still has G->oldBW so
// the worst case here is that we end up giving G back to the oldBW
// - Blob Manager revokes and re-assigns a range but neither went through before BM died. Same as above.
// - While the BM is recovering, a BW dies. Then, when the BM comes back, it seems granuleMappings of the form
// G->deadBW. Since
// we have a complete list of alive BWs by now, we can just reassign these G's to the next best worker
// - BM persisted intent to split ranges but died before sending them. We just have to re-assign these. If any of
// the new workers died, then
// it's one of the cases above.
//
// We have to think about how to recreate bmData->granuleAssignments and the order in which to send these assigns.
// If we apply all of granuleMappingKeys in a keyrangemap and then apply the splits over it, I think that works.
// We can't just send assigns for granuleMappingKeys because entries there might be dated.
//
// We need a special type of assign req (something like continue) that if fails, doesn't try to find a different
// worker. The reason is that if we can guarantee the worker was alive by the time it got the req, then if the req
// failed,
//
state RangeResult blobGranuleMappings;
state RangeResult blobGranuleSplits;
// get assignments
loop {
tr->reset();
try {
wait(checkManagerLock(tr, bmData));
RangeResult _results = wait(krmGetRanges(tr,
blobGranuleMappingKeys.begin,
KeyRange(normalKeys),
GetRangeLimits::ROW_LIMIT_UNLIMITED,
GetRangeLimits::BYTE_LIMIT_UNLIMITED));
blobGranuleMappings = _results;
if (blobGranuleMappings.more) {
}
wait(tr->commit());
} catch (Error& e) {
wait(tr->onError(e));
}
}
// get splits
loop {
tr->reset();
try {
wait(checkManagerLock(tr, bmData));
RangeResult _results = wait(krmGetRanges(tr,
blobGranuleMappingKeys.begin,
KeyRange(normalKeys),
GetRangeLimits::ROW_LIMIT_UNLIMITED,
GetRangeLimits::BYTE_LIMIT_UNLIMITED));
results = _results;
if (results.more) {
}
wait(tr->commit());
} catch (Error& e) {
wait(tr->onError(e));
}
}
for (auto range : blobGranuleMappings) {
bmData->workerAssignments.insert(range.key, decodeBlobGranuleMappingValue(range.value));
}
for (auto range : blobGranuleSplits) {
if bmData
->workerAssignments.insert(range.key, decodeBlobGranuleMappingValue(range.value));
}
/*
for (range : bmData->workerAssignments) {
if (workersbyId.count(range.value)) {
} else {
range.value = UID();
}
}
send assign with worker = range.workerId
*/
return Void();
}
ACTOR Future<Void> chaosRangeMover(BlobManagerData* bmData) {
ASSERT(g_network->isSimulated());
loop {
@ -1122,6 +1271,9 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });
// we need to acknowledge existing blob workers before recruiting any new ones
wait(ackExistingBlobWorkers(&self));
self.addActor.send(blobWorkerRecruiter(&self, recruitBlobWorker));
self.addActor.send(monitorClientRanges(&self));
self.addActor.send(rangeAssigner(&self));

View File

@ -2385,6 +2385,31 @@ ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWork
}
}
ACTOR Future<Void> deregisterBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
tr->clear(blobWorkerListKey);
wait(tr->commit());
if (BW_DEBUG) {
printf("Deregistered blob worker %s\n", interf.id().toString().c_str());
}
return Void();
} catch (Error& e) {
if (BW_DEBUG) {
printf("Deregistering blob worker %s got error %s\n", interf.id().toString().c_str(), e.name());
}
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> handleRangeAssign(Reference<BlobWorkerData> bwData,
AssignBlobRangeRequest req,
bool isSelfReassign) {
@ -2552,7 +2577,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
}
when(AssignBlobRangeRequest _req = waitNext(bwInterf.assignBlobRangeRequest.getFuture())) {
++self->stats.rangeAssignmentRequests;
--self->stats.numRangesAssigned;
++self->stats.numRangesAssigned;
state AssignBlobRangeRequest assignReq = _req;
if (BW_DEBUG) {
printf("Worker %s assigned range [%s - %s) @ (%lld, %lld):\n continue=%s\n",

View File

@ -19,6 +19,8 @@
*/
#include <cinttypes>
#include <vector>
#include "fdbclient/SystemData.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"