Changed BlobManagerData from pointer to reference to fix ASAN issues
This commit is contained in:
parent
6a8e73891f
commit
07f09f1118
|
@ -43,8 +43,6 @@
|
|||
|
||||
#define BM_DEBUG true
|
||||
|
||||
// FIXME: change all BlobManagerData* to Reference<BlobManagerData> to avoid segfaults if core loop gets error
|
||||
|
||||
// TODO add comments + documentation
|
||||
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
|
||||
Arena& ar,
|
||||
|
@ -199,7 +197,7 @@ struct BlobWorkerStats {
|
|||
BlobWorkerStats(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
|
||||
};
|
||||
|
||||
struct BlobManagerData {
|
||||
struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
|
||||
UID id;
|
||||
Database db;
|
||||
Optional<Key> dcId;
|
||||
|
@ -287,7 +285,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesT
|
|||
|
||||
// Picks a worker with the fewest number of already assigned ranges.
|
||||
// If there is a tie, picks one such worker at random.
|
||||
ACTOR Future<UID> pickWorkerForAssign(BlobManagerData* bmData) {
|
||||
ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
|
||||
// wait until there are BWs to pick from
|
||||
while (bmData->workerStats.size() == 0) {
|
||||
// TODO REMOVE
|
||||
|
@ -326,7 +324,10 @@ ACTOR Future<UID> pickWorkerForAssign(BlobManagerData* bmData) {
|
|||
return eligibleWorkers[idx];
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment assignment, UID workerID, int64_t seqNo) {
|
||||
ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
|
||||
RangeAssignment assignment,
|
||||
UID workerID,
|
||||
int64_t seqNo) {
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} {1} range [{2} - {3}) @ ({4}, {5}) to {6}\n",
|
||||
|
@ -468,7 +469,7 @@ ACTOR Future<Void> doRangeAssignment(BlobManagerData* bmData, RangeAssignment as
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
|
||||
loop {
|
||||
// inject delay into range assignments
|
||||
if (BUGGIFY_WITH_PROB(0.05)) {
|
||||
|
@ -549,7 +550,7 @@ ACTOR Future<Void> rangeAssigner(BlobManagerData* bmData) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, Reference<BlobManagerData> bmData) {
|
||||
Optional<Value> currentLockValue = wait(tr->get(blobManagerEpochKey));
|
||||
ASSERT(currentLockValue.present());
|
||||
int64_t currentEpoch = decodeBlobManagerEpochValue(currentLockValue.get());
|
||||
|
@ -571,7 +572,8 @@ ACTOR Future<Void> checkManagerLock(Reference<ReadYourWritesTransaction> tr, Blo
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> writeInitialGranuleMapping(BlobManagerData* bmData, Standalone<VectorRef<KeyRef>> boundaries) {
|
||||
ACTOR Future<Void> writeInitialGranuleMapping(Reference<BlobManagerData> bmData,
|
||||
Standalone<VectorRef<KeyRef>> boundaries) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
// don't do too many in one transaction
|
||||
state int i = 0;
|
||||
|
@ -615,7 +617,7 @@ ACTOR Future<Void> writeInitialGranuleMapping(BlobManagerData* bmData, Standalon
|
|||
|
||||
// FIXME: this does all logic in one transaction. Adding a giant range to an existing database to blobify would
|
||||
// require doing a ton of storage metrics calls, which we should split up across multiple transactions likely.
|
||||
ACTOR Future<Void> monitorClientRanges(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
|
||||
state Optional<Value> lastChangeKeyValue;
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
@ -750,7 +752,7 @@ static void downsampleSplit(const Standalone<VectorRef<KeyRef>>& splits,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
||||
ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
|
||||
UID currentWorkerId,
|
||||
KeyRange granuleRange,
|
||||
UID granuleID,
|
||||
|
@ -946,7 +948,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deregisterBlobWorker(BlobManagerData* bmData, BlobWorkerInterface interf) {
|
||||
ACTOR Future<Void> deregisterBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface interf) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -971,7 +973,7 @@ ACTOR Future<Void> deregisterBlobWorker(BlobManagerData* bmData, BlobWorkerInter
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf, bool registered) {
|
||||
ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf, bool registered) {
|
||||
state UID bwId = bwInterf.id();
|
||||
|
||||
// Remove blob worker from stats map so that when we try to find a worker to takeover the range,
|
||||
|
@ -1038,7 +1040,7 @@ ACTOR Future<Void> killBlobWorker(BlobManagerData* bmData, BlobWorkerInterface b
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
|
||||
ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf) {
|
||||
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
|
||||
// outer loop handles reconstructing stream if it got a retryable error
|
||||
loop {
|
||||
|
@ -1142,7 +1144,7 @@ ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerIn
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
|
||||
ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface bwInterf) {
|
||||
try {
|
||||
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
|
||||
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
|
||||
|
@ -1191,7 +1193,7 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkBlobWorkerList(BlobManagerData* bmData, Promise<Void> workerListReady) {
|
||||
ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promise<Void> workerListReady) {
|
||||
loop {
|
||||
// Get list of last known blob workers
|
||||
// note: the list will include every blob worker that the old manager knew about,
|
||||
|
@ -1217,7 +1219,7 @@ ACTOR Future<Void> checkBlobWorkerList(BlobManagerData* bmData, Promise<Void> wo
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
||||
state Promise<Void> workerListReady;
|
||||
bmData->addActor.send(checkBlobWorkerList(bmData, workerListReady));
|
||||
wait(workerListReady.getFuture());
|
||||
|
@ -1549,7 +1551,7 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> chaosRangeMover(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> chaosRangeMover(Reference<BlobManagerData> bmData) {
|
||||
ASSERT(g_network->isSimulated());
|
||||
loop {
|
||||
wait(delay(30.0));
|
||||
|
@ -1602,7 +1604,7 @@ ACTOR Future<Void> chaosRangeMover(BlobManagerData* bmData) {
|
|||
}
|
||||
|
||||
// Returns the number of blob workers on addr
|
||||
int numExistingBWOnAddr(BlobManagerData* self, const AddressExclusion& addr) {
|
||||
int numExistingBWOnAddr(Reference<BlobManagerData> self, const AddressExclusion& addr) {
|
||||
int numExistingBW = 0;
|
||||
for (auto& server : self->workersById) {
|
||||
const NetworkAddress& netAddr = server.second.stableAddress();
|
||||
|
@ -1616,7 +1618,7 @@ int numExistingBWOnAddr(BlobManagerData* self, const AddressExclusion& addr) {
|
|||
}
|
||||
|
||||
// Tries to recruit a blob worker on the candidateWorker process
|
||||
ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorkerReply candidateWorker) {
|
||||
ACTOR Future<Void> initializeBlobWorker(Reference<BlobManagerData> self, RecruitBlobWorkerReply candidateWorker) {
|
||||
const NetworkAddress& netAddr = candidateWorker.worker.stableAddress();
|
||||
AddressExclusion workerAddr(netAddr.ip, netAddr.port);
|
||||
self->recruitingStream.set(self->recruitingStream.get() + 1);
|
||||
|
@ -1698,7 +1700,7 @@ ACTOR Future<Void> initializeBlobWorker(BlobManagerData* self, RecruitBlobWorker
|
|||
|
||||
// Recruits blob workers in a loop
|
||||
ACTOR Future<Void> blobWorkerRecruiter(
|
||||
BlobManagerData* self,
|
||||
Reference<BlobManagerData> self,
|
||||
Reference<IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>> recruitBlobWorker) {
|
||||
state Future<RecruitBlobWorkerReply> fCandidateWorker;
|
||||
state RecruitBlobWorkerRequest lastRequest;
|
||||
|
@ -1757,7 +1759,7 @@ ACTOR Future<Void> blobWorkerRecruiter(
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> haltBlobGranules(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> haltBlobGranules(Reference<BlobManagerData> bmData) {
|
||||
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
|
||||
std::vector<Future<Void>> deregisterBlobWorkers;
|
||||
for (auto& worker : blobWorkers) {
|
||||
|
@ -1771,7 +1773,7 @@ ACTOR Future<Void> haltBlobGranules(BlobManagerData* bmData) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granuleID) {
|
||||
ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, UID granuleID) {
|
||||
state Transaction tr(bmData->db);
|
||||
state KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
|
||||
state Key startKey = range.begin;
|
||||
|
@ -1791,7 +1793,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granule
|
|||
* also removes the history entry for this granule from the system keyspace
|
||||
* TODO ensure cannot fully delete granule that is still splitting!
|
||||
*/
|
||||
ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyRef historyKey) {
|
||||
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, KeyRef historyKey) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
|
||||
}
|
||||
|
@ -1863,7 +1865,7 @@ ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyR
|
|||
* file might be deleted. We will need to ensure we don't rely on the granule's startVersion
|
||||
* (that's persisted as part of the key), but rather use the granule's first snapshot's version when needed
|
||||
*/
|
||||
ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId, Version pruneVersion) {
|
||||
ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Version pruneVersion) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
|
||||
}
|
||||
|
@ -1963,7 +1965,11 @@ ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId,
|
|||
* Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done
|
||||
* processing this prune intent.
|
||||
*/
|
||||
ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef endKey, Version pruneVersion, bool force) {
|
||||
ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
|
||||
KeyRef startKey,
|
||||
KeyRef endKey,
|
||||
Version pruneVersion,
|
||||
bool force) {
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n",
|
||||
startKey.printable(),
|
||||
|
@ -2223,7 +2229,7 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
* improvements we don't really need here (also we need to go over all prune intents anyways in the
|
||||
* case that the timer is up before any new prune intents arrive).
|
||||
*/
|
||||
ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
||||
ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
|
||||
// setup bstore
|
||||
try {
|
||||
if (BM_DEBUG) {
|
||||
|
@ -2402,7 +2408,7 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doLockChecks(BlobManagerData* bmData) {
|
||||
ACTOR Future<Void> doLockChecks(Reference<BlobManagerData> bmData) {
|
||||
loop {
|
||||
Promise<Void> check = bmData->doLockCheck;
|
||||
wait(check.getFuture());
|
||||
|
@ -2439,40 +2445,41 @@ ACTOR Future<Void> doLockChecks(BlobManagerData* bmData) {
|
|||
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
|
||||
int64_t epoch) {
|
||||
state BlobManagerData self(deterministicRandom()->randomUniqueID(),
|
||||
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
|
||||
bmInterf.locality.dcId());
|
||||
state Reference<BlobManagerData> self =
|
||||
makeReference<BlobManagerData>(deterministicRandom()->randomUniqueID(),
|
||||
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
|
||||
bmInterf.locality.dcId());
|
||||
|
||||
state Future<Void> collection = actorCollection(self.addActor.getFuture());
|
||||
state Future<Void> collection = actorCollection(self->addActor.getFuture());
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Blob manager {0} starting...\n", epoch);
|
||||
}
|
||||
|
||||
self.epoch = epoch;
|
||||
self->epoch = epoch;
|
||||
|
||||
// although we start the recruiter, we wait until existing workers are ack'd
|
||||
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
|
||||
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });
|
||||
self.addActor.send(blobWorkerRecruiter(&self, recruitBlobWorker));
|
||||
self->addActor.send(blobWorkerRecruiter(self, recruitBlobWorker));
|
||||
|
||||
// we need to recover the old blob manager's state (e.g. granule assignments) before
|
||||
// before the new blob manager does anything
|
||||
wait(recoverBlobManager(&self));
|
||||
wait(recoverBlobManager(self));
|
||||
|
||||
self.addActor.send(doLockChecks(&self));
|
||||
self.addActor.send(monitorClientRanges(&self));
|
||||
self.addActor.send(rangeAssigner(&self));
|
||||
self.addActor.send(monitorPruneKeys(&self));
|
||||
self->addActor.send(doLockChecks(self));
|
||||
self->addActor.send(monitorClientRanges(self));
|
||||
self->addActor.send(rangeAssigner(self));
|
||||
self->addActor.send(monitorPruneKeys(self));
|
||||
|
||||
if (BUGGIFY) {
|
||||
self.addActor.send(chaosRangeMover(&self));
|
||||
self->addActor.send(chaosRangeMover(self));
|
||||
}
|
||||
|
||||
// TODO probably other things here eventually
|
||||
try {
|
||||
loop choose {
|
||||
when(wait(self.iAmReplaced.getFuture())) {
|
||||
when(wait(self->iAmReplaced.getFuture())) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Blob Manager exiting because it is replaced\n");
|
||||
}
|
||||
|
@ -2484,7 +2491,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
break;
|
||||
}
|
||||
when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) {
|
||||
wait(haltBlobGranules(&self));
|
||||
wait(haltBlobGranules(self));
|
||||
req.reply.send(Void());
|
||||
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue