adding priorities to blob worker and fixing monitoring in blob manager

This commit is contained in:
Josh Slocum 2021-10-05 16:51:19 -05:00
parent 0e043bf456
commit 6a24ef9258
3 changed files with 109 additions and 32 deletions

View File

@ -701,27 +701,14 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData, UID currentWorkerId,
return Void();
}
ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
try {
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
state ReplyPromiseStream<GranuleStatusReply> statusStream =
bwInterf.granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
loop choose {
when(wait(waitFailure)) {
// FIXME: actually handle this!!
if (BM_DEBUG) {
printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str());
}
// get all of its ranges
// send revoke request to get back all its ranges
// send halt (look at rangeMover)
// send all its ranges to assignranges stream
return Void();
}
when(GranuleStatusReply _rep = waitNext(statusStream.getFuture())) {
GranuleStatusReply rep = _rep;
ACTOR Future<Void> monitorBlobWorkerStatus(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
state KeyRangeMap<std::pair<int64_t, int64_t>> lastSeenSeqno;
loop {
try {
state ReplyPromiseStream<GranuleStatusReply> statusStream =
bwInterf.granuleStatusStreamRequest.getReplyStream(GranuleStatusStreamRequest(bmData->epoch));
loop {
GranuleStatusReply rep = waitNext(statusStream.getFuture());
if (BM_DEBUG) {
printf("BM %lld got status of [%s - %s) @ (%lld, %lld) from BW %s: %s\n",
bmData->epoch,
@ -769,12 +756,65 @@ ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterfac
bmData->addActor.send(maybeSplitRange(bmData, bwInterf.id(), rep.granuleRange));
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
ASSERT(e.code() != error_code_end_of_stream);
if (e.code() == error_code_connection_failed || e.code() == error_code_request_maybe_delivered) {
// FIXME: this could throw connection_failed and we could handle catch this the same as the failure
// detection triggering
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
continue;
} else {
if (BM_DEBUG) {
printf("BM got unexpected error %s monitoring BW %s status\n",
e.name(),
bwInterf.id().toString().c_str());
}
// TODO change back from SevError?
TraceEvent(SevError, "BWStatusMonitoringFailed", bmData->id)
.detail("BlobWorkerID", bwInterf.id())
.error(e);
throw e;
}
}
}
}
ACTOR Future<Void> monitorBlobWorker(BlobManagerData* bmData, BlobWorkerInterface bwInterf) {
try {
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
choose {
when(wait(waitFailure)) {
// FIXME: actually handle this!!
if (BM_DEBUG) {
printf("BM %lld detected BW %s is dead\n", bmData->epoch, bwInterf.id().toString().c_str());
}
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
// get all of its ranges
// send revoke request to get back all its ranges
// send halt (look at rangeMover)
// send all its ranges to assignranges stream
return Void();
}
when(wait(monitorStatus)) {
ASSERT(false);
throw internal_error();
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
// FIXME: forward errors somewhere from here
if (BM_DEBUG) {
printf("BM got unexpected error %s monitoring BW %s\n", e.name(), bwInterf.id().toString().c_str());
}
// TODO change back from SevError?
TraceEvent(SevError, "BWMonitoringFailed", bmData->id).detail("BlobWorkerID", bwInterf.id()).error(e);
throw e;
}
}

View File

@ -39,7 +39,7 @@
#include "flow/flow.h"
#define BW_DEBUG true
#define BW_REQUEST_DEBUG true
#define BW_REQUEST_DEBUG false
// FIXME: change all BlobWorkerData* to Reference<BlobWorkerData> to avoid segfaults if core loop gets error
@ -491,6 +491,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
Future<BlobFileIndex> previousDeltaFileFuture,
Optional<KeyRange> oldChangeFeedDataComplete,
Optional<UID> oldChangeFeedId) {
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
// potentially kick off delta file commit check, if our version isn't already known to be committed
state uint64_t checkCount = -1;
if (bwData->knownCommittedVersion.get() < currentDeltaVersion) {
@ -527,7 +528,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
wait(bwData->knownCommittedVersion.onChange());
}
BlobFileIndex prev = wait(previousDeltaFileFuture);
wait(yield()); // prevent stack overflow of many chained futures
wait(delay(0, TaskPriority::BlobWorkerUpdateFDB));
// update FDB with new file
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
@ -600,11 +601,14 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
state Arena arena;
state GranuleSnapshot snapshot;
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
loop {
try {
RangeResult res = waitNext(rows.getFuture());
arena.dependsOn(res.arena());
snapshot.append(arena, res.begin(), res.size());
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
@ -613,6 +617,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
}
}
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
if (BW_DEBUG) {
printf("Granule [%s - %s) read %d snapshot rows\n",
keyRange.begin.printable().c_str(),
@ -648,6 +654,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
wait(objectFile->append(serialized.begin(), serialized.size()));
wait(objectFile->finish());
wait(delay(0, TaskPriority::BlobWorkerUpdateFDB));
// object uploaded successfully, save it to system key space
// TODO add conflict range for writes?
state Tuple snapshotFileKey;
@ -750,6 +758,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, R
ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData,
Reference<GranuleMetadata> metadata,
GranuleFiles files) {
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
if (BW_DEBUG) {
printf("Compacting snapshot from blob for [%s - %s)\n",
metadata->keyRange.begin.printable().c_str(),
@ -1044,6 +1053,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// before starting, make sure worker persists range assignment and acquires the granule lock
GranuleChangeFeedInfo _info = wait(metadata->assignFuture);
changeFeedInfo = _info;
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
cfKey = StringRef(changeFeedInfo.changeFeedId.toString());
if (changeFeedInfo.prevChangeFeedId.present()) {
oldCFKey = StringRef(changeFeedInfo.prevChangeFeedId.get().toString());
@ -1105,6 +1117,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
startVersion = newSnapshotFile.version;
metadata->files.snapshotFiles.push_back(newSnapshotFile);
metadata->durableSnapshotVersion.set(startVersion);
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
metadata->pendingSnapshotVersion = startVersion;
}
@ -1148,6 +1162,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
if (!inFlightBlobSnapshot.isValid()) {
while (inFlightDeltaFiles.size() > 0) {
@ -1159,7 +1175,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
cfKey,
changeFeedInfo.changeFeedStartVersion,
rollbacksCompleted));
inFlightDeltaFiles.pop_front();
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
} else {
break;
}
@ -1168,7 +1186,9 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// inject delay into reading change feed stream
if (BUGGIFY_WITH_PROB(0.001)) {
wait(delay(deterministicRandom()->random01()));
wait(delay(deterministicRandom()->random01(), TaskPriority::BlobWorkerReadChangeFeed));
} else {
wait(delay(0, TaskPriority::BlobWorkerReadChangeFeed));
}
state Standalone<VectorRef<MutationsAndVersionRef>> mutations;
@ -1288,6 +1308,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->files.snapshotFiles.push_back(completedSnapshot);
metadata->durableSnapshotVersion.set(completedSnapshot.version);
inFlightBlobSnapshot = Future<BlobFileIndex>(); // not valid!
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
for (auto& it : inFlightDeltaFiles) {
BlobFileIndex completedDeltaFile = wait(it.future);
@ -1297,6 +1318,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
cfKey,
changeFeedInfo.changeFeedStartVersion,
rollbacksCompleted));
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
}
inFlightDeltaFiles.clear();
@ -1326,6 +1348,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
if (result.present()) {
break;
}
// FIXME: re-trigger this loop if blob manager status stream changes
if (BW_DEBUG) {
printf("Granule [%s - %s)\n, hasn't heard back from BM, re-sending status\n",
metadata->keyRange.begin.printable().c_str(),
@ -1359,24 +1382,31 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// if we're in the old change feed case and can't snapshot but we have enough data to, don't queue
// too many delta files in parallel
while (inFlightDeltaFiles.size() > 10) {
printf("[%s - %s) Waiting on delta file b/c old change feed\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf("[%s - %s) Waiting on delta file b/c old change feed\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future);
printf(" [%s - %s) Got completed delta file\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
if (BW_DEBUG) {
printf(" [%s - %s) Got completed delta file\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str());
}
wait(handleCompletedDeltaFile(bwData,
metadata,
completedDeltaFile,
cfKey,
changeFeedInfo.changeFeedStartVersion,
rollbacksCompleted));
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
inFlightDeltaFiles.pop_front();
}
}
snapshotEligible = false;
wait(yield(TaskPriority::BlobWorkerReadChangeFeed));
// finally, after we optionally write delta and snapshot files, add new mutations to buffer
if (!deltas.mutations.empty()) {
if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) {
@ -1685,6 +1715,9 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran
when(wait(metadata->rollbackCount.onChange())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
}
wait(yield(TaskPriority::DefaultEndpoint));
if (rollbackCount == metadata->rollbackCount.get()) {
break;
} else if (BW_REQUEST_DEBUG) {
@ -1771,8 +1804,6 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(BlobWorkerData* bwData, BlobGran
rep.chunks.push_back(rep.arena, chunk);
bwData->stats.readReqTotalFilesReturned += chunk.deltaFiles.size() + int(chunk.snapshotFile.present());
wait(yield());
}
req.reply.send(rep);
--bwData->stats.activeReadRequests;
@ -2343,6 +2374,9 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
addActor.send(handleRangeAssign(&self, granuleToReassign, true));
}
when(wait(collection)) {
if (BW_DEBUG) {
printf("BW actor collection returned, exiting\n");
}
ASSERT(false);
throw internal_error();
}

View File

@ -100,6 +100,9 @@ enum class TaskPriority {
UpdateStorage = 3000,
CompactCache = 2900,
TLogSpilledPeekReply = 2800,
BlobWorkerReadChangeFeed = 2720,
BlobWorkerUpdateFDB = 2710,
BlobWorkerUpdateStorage = 2700,
FetchKeys = 2500,
RestoreApplierWriteDB = 2310,
RestoreApplierReceiveMutations = 2300,