diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 109cfed2e4..aa06bc2da9 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -40,7 +40,7 @@ #include "flow/flow.h" #define BW_DEBUG true -#define BW_REQUEST_DEBUG true +#define BW_REQUEST_DEBUG false // TODO add comments + documentation struct BlobFileIndex { @@ -109,13 +109,10 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { ASSERT(resumeSnapshot.canBeSet()); resumeSnapshot.send(Void()); } - - ~GranuleMetadata() { printf("in dtor for GranuleMetadata\n"); } }; // TODO: rename this struct struct GranuleRangeMetadata { - int id = 0; int64_t lastEpoch; int64_t lastSeqno; Reference activeMetadata; @@ -125,42 +122,16 @@ struct GranuleRangeMetadata { GranuleRangeMetadata() : lastEpoch(0), lastSeqno(0) {} GranuleRangeMetadata(int64_t epoch, int64_t seqno, Reference activeMetadata) - : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) { - /* - if (activeMetadata.isValid()) { - activeMetadata->cancelled.reset(); - } - */ - } - - ~GranuleRangeMetadata() { - printf("GranuleRangeMetadata is being destroyed\n"); - /* - if (activeMetadata.isValid() && activeMetadata->cancelled.canBeSet()) { - printf("Cancelling activeMetadata\n"); - activeMetadata->cancelled.send(Void()); - } - assignFuture.cancel(); - fileUpdaterFuture.cancel(); - */ - /* - if (id == 42) { - printf("GranuleRangeMetadata with id %d is being destroyed\n", id); - sleep(10); - } - */ - } + : lastEpoch(epoch), lastSeqno(seqno), activeMetadata(activeMetadata) {} }; struct BlobWorkerData : NonCopyable, ReferenceCounted { UID id; Database db; - AsyncVar dead; BlobWorkerStats stats; PromiseStream> addActor; - ActorCollection actors{ false }; LocalityData locality; int64_t currentManagerEpoch = -1; @@ -178,8 +149,7 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted { PromiseStream granuleUpdateErrors; - BlobWorkerData(UID id, Database db) - : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), actors(false) {} + BlobWorkerData(UID id, Database db) : id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL) {} ~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); } bool managerEpochOk(int64_t epoch) { @@ -229,7 +199,6 @@ static void checkGranuleLock(int64_t epoch, int64_t seqno, int64_t ownerEpoch, i ASSERT(epoch < ownerEpoch || (epoch == ownerEpoch && seqno <= ownerSeqno)); // returns true if we still own the lock, false if someone else does - printf("epoch: %lld, seqno: %lld, ownerEpoch: %lld, ownerSeqno: %lld\n", epoch, seqno, ownerEpoch, ownerSeqno); if (epoch != ownerEpoch || seqno != ownerSeqno) { if (BW_DEBUG) { printf("Lock assignment check failed. Expected (%lld, %lld), got (%lld, %lld)\n", @@ -578,7 +547,6 @@ ACTOR Future writeDeltaFile(Reference bwData, return BlobFileIndex(currentDeltaVersion, fname, 0, serialized.size()); } catch (Error& e) { numIterations++; - printf("writeDeltaFile error: %s\n", e.name()); wait(tr->onError(e)); } } @@ -586,7 +554,7 @@ ACTOR Future writeDeltaFile(Reference bwData, if (e.code() == error_code_operation_cancelled) { throw e; } - // TODO: do this for writeSnapshot + if (numIterations != 1 || e.code() != error_code_granule_assignment_conflict) { throw e; } @@ -912,15 +880,12 @@ ACTOR Future handleCompletedDeltaFile(Reference bwData, // have completed // FIXME: also have these be async, have each pop change feed wait on the prior one, wait on them before // re-snapshotting - printf("in handleCompletedDeltaFile for BW %s\n", bwData->id.toString().c_str()); Future popFuture = bwData->db->popChangeFeedMutations(cfKey, completedDeltaFile.version); wait(popFuture); - printf("popChangeFeedMutations returned\n"); } while (!rollbacksInProgress.empty() && completedDeltaFile.version >= rollbacksInProgress.front().first) { rollbacksInProgress.pop_front(); } - printf("removed rollbacks\n"); return Void(); } @@ -1071,16 +1036,12 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, state bool snapshotEligible; // just wrote a delta file or just took granule over from another worker state bool justDidRollback = false; - printf( - "metadata %s %s\n", metadata->keyRange.begin.printable().c_str(), metadata->keyRange.end.printable().c_str()); try { // set resume snapshot so it's not valid until we pause to ask the blob manager for a re-snapshot metadata->resumeSnapshot.send(Void()); // before starting, make sure worker persists range assignment and acquires the granule lock - printf("before wait on assignFuture\n"); GranuleChangeFeedInfo _info = wait(assignFuture); - printf("after wait on assignFuture\n"); changeFeedInfo = _info; wait(delay(0, TaskPriority::BlobWorkerUpdateStorage)); @@ -1175,14 +1136,11 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, changeFeedInfo.granuleSplitFrom.get() /*metadata->keyRange*/); } else { readOldChangeFeed = false; - printf("before getChangeFeedStream, my ID is %s\n", bwData->id.toString().c_str()); changeFeedFuture = bwData->db->getChangeFeedStream( changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange); } loop { - printf("bw %s\n", bwData->id.toString().c_str()); - // check outstanding snapshot/delta files for completion if (inFlightBlobSnapshot.isValid() && inFlightBlobSnapshot.isReady()) { BlobFileIndex completedSnapshot = wait(inFlightBlobSnapshot); @@ -1198,8 +1156,6 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, wait(yield(TaskPriority::BlobWorkerUpdateStorage)); } if (!inFlightBlobSnapshot.isValid()) { - printf("!inFlightBlobSnapshot.isValid\n"); - printf("inFlightDeltaFiles.size() = %d\n", inFlightDeltaFiles.size()); while (inFlightDeltaFiles.size() > 0) { if (inFlightDeltaFiles.front().future.isReady()) { BlobFileIndex completedDeltaFile = wait(inFlightDeltaFiles.front().future); @@ -1227,7 +1183,6 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, state Standalone> mutations; if (readOldChangeFeed) { - printf("readOldChangeFeed\n"); Standalone> oldMutations = waitNext(oldChangeFeedStream.getFuture()); // TODO filter old mutations won't be necessary, SS does it already if (filterOldMutations( @@ -1254,12 +1209,8 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } // process mutations - printf("mutations.size() == %d\n", mutations.size()); for (MutationsAndVersionRef d : mutations) { state MutationsAndVersionRef deltas = d; - if (deltas.version >= 158685394) { - printf("deltas.version=%lld\n", deltas.version); - } ASSERT(deltas.version >= metadata->bufferedDeltaVersion.get()); // Write a new delta file IF we have enough bytes, and we have all of the previous version's stuff // there to ensure no versions span multiple delta files. Check this by ensuring the version of this @@ -1331,8 +1282,6 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, // bunch of extra delta files at some point, even if we don't consider it for a split yet if (snapshotEligible && metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT && !readOldChangeFeed) { - printf("snapshotEligible && metadata->bytesInNewDeltaFiles >= " - "SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT && !readOldChangeFeed\n"); if (BW_DEBUG && (inFlightBlobSnapshot.isValid() || !inFlightDeltaFiles.empty())) { printf("Granule [%s - %s) ready to re-snapshot, waiting for outstanding %d snapshot and %d " "deltas to " @@ -1399,19 +1348,12 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, when(wait(bwData->currentManagerStatusStream.onChange())) {} } - /* - if (bwData->dead.get()) { - std::cout << "bw detected dead in blobGranuleUpdateFiles" << std::endl; - throw actor_cancelled(); - } - */ if (BW_DEBUG) { printf("Granule [%s - %s)\n, hasn't heard back from BM in BW %s, re-sending status\n", metadata->keyRange.begin.printable().c_str(), metadata->keyRange.end.printable().c_str(), bwData->id.toString().c_str()); } - // wait(yield()); } if (BW_DEBUG) { @@ -1437,8 +1379,6 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata->bytesInNewDeltaFiles = 0; } else if (snapshotEligible && metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) { - printf("snapshotEligible && metadata->bytesInNewDeltaFiles >= " - "SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT\n"); // if we're in the old change feed case and can't snapshot but we have enough data to, don't // queue too many delta files in parallel while (inFlightDeltaFiles.size() > 10) { @@ -1469,7 +1409,6 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, // finally, after we optionally write delta and snapshot files, add new mutations to buffer if (!deltas.mutations.empty()) { - printf("!deltas.mutations.empty()\n"); if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) { // Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed // For correctness right now, there can be no waits and yields either in rollback handling @@ -1586,11 +1525,8 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } } justDidRollback = false; - std::cout << "looping in blobGranuleUpdateFiles" << std::endl; } } catch (Error& e) { - printf("IN CATCH FOR %s blobGranuleUpdateFiles -----\n", bwData->id.toString().c_str()); - printf("error is %s\n", e.name()); if (e.code() == error_code_operation_cancelled) { throw; } @@ -1682,19 +1618,19 @@ static Future waitForVersion(Reference metadata, Version // if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete, // nothing to do - if (v >= 162692991) { - printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n " - "durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n", - metadata->keyRange.begin.printable().c_str(), - metadata->keyRange.end.printable().c_str(), - v, - metadata->readable.isSet() ? "T" : "F", - metadata->bufferedDeltaVersion.get(), - metadata->pendingDeltaVersion, - metadata->durableDeltaVersion.get(), - metadata->pendingSnapshotVersion, - metadata->durableSnapshotVersion.get()); - } + /* + printf(" [%s - %s) waiting for %lld\n readable:%s\n bufferedDelta=%lld\n pendingDelta=%lld\n " + "durableDelta=%lld\n pendingSnapshot=%lld\n durableSnapshot=%lld\n", + metadata->keyRange.begin.printable().c_str(), + metadata->keyRange.end.printable().c_str(), + v, + metadata->readable.isSet() ? "T" : "F", + metadata->bufferedDeltaVersion.get(), + metadata->pendingDeltaVersion, + metadata->durableDeltaVersion.get(), + metadata->pendingSnapshotVersion, + metadata->durableSnapshotVersion.get()); + */ if (metadata->readable.isSet() && v <= metadata->bufferedDeltaVersion.get() && (v <= metadata->durableDeltaVersion.get() || @@ -1708,8 +1644,6 @@ static Future waitForVersion(Reference metadata, Version } ACTOR Future handleBlobGranuleFileRequest(Reference bwData, BlobGranuleFileRequest req) { - printf( - "In handleBlobGranuleFileRequest for BW %s @ version %lld\n", bwData->id.toString().c_str(), req.readVersion); try { // TODO REMOVE in api V2 ASSERT(req.beginVersion == 0); @@ -1732,7 +1666,6 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData req.keyRange.end.printable().c_str()); } - printf("lastRangeEnd < r.begin() || !isValid\n"); throw wrong_shard_server(); } granules.push_back(r.value().activeMetadata); @@ -1747,16 +1680,11 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData req.keyRange.end.printable().c_str()); } - printf("lastRangeEnd < req.keyRange.end\n"); throw wrong_shard_server(); } // do work for each range for (auto m : granules) { - if (req.readVersion >= 162692991) { - printf( - "For BW %s, granule: %s\n", bwData->id.toString().c_str(), m->keyRange.begin.printable().c_str()); - } state Reference metadata = m; // try to check version_too_old, cancelled, waitForVersion without yielding first if (metadata->readable.isSet() && @@ -1773,7 +1701,6 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData throw transaction_too_old(); } if (metadata->cancelled.isSet()) { - printf("metadata->cancelled.isSet()\n"); throw wrong_shard_server(); } @@ -1785,19 +1712,10 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } // rollback resets all of the version information, so we have to redo wait for version on rollback state int rollbackCount = metadata->rollbackCount.get(); - if (req.readVersion >= 162692991) { - printf("For BW %s, before choose\n", bwData->id.toString().c_str()); - } choose { when(wait(waitForVersionFuture)) {} when(wait(metadata->rollbackCount.onChange())) {} - when(wait(metadata->cancelled.getFuture())) { - printf("metadata->cancelled.getFuture()\n"); - throw wrong_shard_server(); - } - } - if (req.readVersion >= 162692991) { - printf("For BW %s, after choose\n", bwData->id.toString().c_str()); + when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } } if (rollbackCount == metadata->rollbackCount.get()) { @@ -1810,10 +1728,6 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } } - if (req.readVersion >= 162692991) { - printf("For BW %s, after loop\n", bwData->id.toString().c_str()); - } - // granule is up to date, do read BlobGranuleChunkRef chunk; @@ -1911,7 +1825,6 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData ACTOR Future persistAssignWorkerRange(Reference bwData, AssignBlobRangeRequest req) { - printf("in persistAssignWorkerRange\n"); ASSERT(!req.continueAssignment); state Transaction tr(bwData->db); state Key lockKey = granuleLockKey(req.keyRange); @@ -1923,158 +1836,149 @@ ACTOR Future persistAssignWorkerRange(Reference prevLockValue = wait(tr.get(lockKey)); - state bool hasPrevOwner = prevLockValue.present(); - if (hasPrevOwner) { - std::tuple prevOwner = decodeBlobGranuleLockValue(prevLockValue.get()); - acquireGranuleLock( - req.managerEpoch, req.managerSeqno, std::get<0>(prevOwner), std::get<1>(prevOwner)); - info.changeFeedId = std::get<2>(prevOwner); + // FIXME: could add list of futures and do the different parts that are disjoint in parallel? + info.changeFeedStartVersion = invalidVersion; + Optional prevLockValue = wait(tr.get(lockKey)); + state bool hasPrevOwner = prevLockValue.present(); + if (hasPrevOwner) { + std::tuple prevOwner = decodeBlobGranuleLockValue(prevLockValue.get()); + acquireGranuleLock(req.managerEpoch, req.managerSeqno, std::get<0>(prevOwner), std::get<1>(prevOwner)); + info.changeFeedId = std::get<2>(prevOwner); - GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, req.keyRange)); - info.existingFiles = granuleFiles; - info.doSnapshot = false; + GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, req.keyRange)); + info.existingFiles = granuleFiles; + info.doSnapshot = false; - if (info.existingFiles.get().snapshotFiles.empty()) { - ASSERT(info.existingFiles.get().deltaFiles.empty()); - info.previousDurableVersion = invalidVersion; - info.doSnapshot = true; - } else if (info.existingFiles.get().deltaFiles.empty()) { - info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version; - } else { - info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version; - } - - // for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be - // smaller than the next delta file write. - info.changeFeedStartVersion = info.previousDurableVersion; - } else { - // else we are first, no need to check for owner conflict - // FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then - // that to bytes - info.changeFeedId = deterministicRandom()->randomUniqueID(); - wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange)); - info.doSnapshot = true; + if (info.existingFiles.get().snapshotFiles.empty()) { + ASSERT(info.existingFiles.get().deltaFiles.empty()); info.previousDurableVersion = invalidVersion; - } - - tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.changeFeedId)); - wait(krmSetRange( - &tr, blobGranuleMappingKeys.begin, req.keyRange, blobGranuleMappingValueFor(bwData->id))); - - Tuple historyKey; - historyKey.append(req.keyRange.begin).append(req.keyRange.end); - Optional parentGranulesValue = - wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin))); - - // If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, - // and the previous durable version will come from the previous granules - if (parentGranulesValue.present()) { - state Standalone> parentGranules = - decodeBlobGranuleHistoryValue(parentGranulesValue.get()); - // TODO REMOVE - if (BW_DEBUG) { - printf("Decoded parent granules for [%s - %s)\n", - req.keyRange.begin.printable().c_str(), - req.keyRange.end.printable().c_str()); - for (auto& pg : parentGranules) { - printf(" [%s - %s)\n", pg.begin.printable().c_str(), pg.end.printable().c_str()); - } - } - - // TODO change this for merge - ASSERT(parentGranules.size() == 1); - - state std::pair granuleSplitState; - if (hasPrevOwner) { - std::pair _st = - wait(getGranuleSplitState(&tr, parentGranules[0], req.keyRange)); - granuleSplitState = _st; - } else { - granuleSplitState = std::pair(BlobGranuleSplitState::Started, invalidVersion); - } - - ASSERT(!hasPrevOwner || granuleSplitState.first > BlobGranuleSplitState::Started); - - // if granule wasn't done with old change feed, load it - if (granuleSplitState.first < BlobGranuleSplitState::Done) { - Optional prevGranuleLockValue = wait(tr.get(granuleLockKey(parentGranules[0]))); - ASSERT(prevGranuleLockValue.present()); - std::tuple prevGranuleLock = - decodeBlobGranuleLockValue(prevGranuleLockValue.get()); - info.prevChangeFeedId = std::get<2>(prevGranuleLock); - info.granuleSplitFrom = parentGranules[0]; - - if (granuleSplitState.first == BlobGranuleSplitState::Assigned) { - // was already assigned, use change feed start version - ASSERT(granuleSplitState.second != invalidVersion); - info.changeFeedStartVersion = granuleSplitState.second; - } else if (granuleSplitState.first == BlobGranuleSplitState::Started) { - wait(updateGranuleSplitState(&tr, - parentGranules[0], - req.keyRange, - info.prevChangeFeedId.get(), - BlobGranuleSplitState::Assigned)); - // change feed was created as part of this transaction, changeFeedStartVersion will be - // set later - } else { - ASSERT(false); - } - } - - if (info.doSnapshot) { - // only need to do snapshot if no files exist yet for this granule. - ASSERT(info.previousDurableVersion == invalidVersion); - // FIXME: store this somewhere useful for time travel reads - GranuleFiles prevFiles = wait(loadPreviousFiles(&tr, parentGranules[0])); - ASSERT(!prevFiles.snapshotFiles.empty() || !prevFiles.deltaFiles.empty()); - - info.blobFilesToSnapshot = prevFiles; - info.previousDurableVersion = info.blobFilesToSnapshot.get().deltaFiles.empty() - ? info.blobFilesToSnapshot.get().snapshotFiles.back().version - : info.blobFilesToSnapshot.get().deltaFiles.back().version; - } - } - - wait(tr.commit()); - - if (info.changeFeedStartVersion == invalidVersion) { - info.changeFeedStartVersion = tr.getCommittedVersion(); + info.doSnapshot = true; + } else if (info.existingFiles.get().deltaFiles.empty()) { + info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version; } else { - ASSERT(info.changeFeedStartVersion != invalidVersion); + info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version; } - TraceEvent("BlobWorkerPersistedAssignment", bwData->id).detail("Granule", req.keyRange); - - return info; - } catch (Error& e) { - if (e.code() == error_code_granule_assignment_conflict) { - throw e; - } - wait(tr.onError(e)); + // for the non-splitting cases, this doesn't need to be 100% accurate, it just needs to be + // smaller than the next delta file write. + info.changeFeedStartVersion = info.previousDurableVersion; + } else { + // else we are first, no need to check for owner conflict + // FIXME: use actual 16 bytes of UID instead of converting it to 32 character string and then + // that to bytes + info.changeFeedId = deterministicRandom()->randomUniqueID(); + wait(tr.registerChangeFeed(StringRef(info.changeFeedId.toString()), req.keyRange)); + info.doSnapshot = true; + info.previousDurableVersion = invalidVersion; } + + tr.set(lockKey, blobGranuleLockValueFor(req.managerEpoch, req.managerSeqno, info.changeFeedId)); + wait(krmSetRange(&tr, blobGranuleMappingKeys.begin, req.keyRange, blobGranuleMappingValueFor(bwData->id))); + + Tuple historyKey; + historyKey.append(req.keyRange.begin).append(req.keyRange.end); + Optional parentGranulesValue = + wait(tr.get(historyKey.getDataAsStandalone().withPrefix(blobGranuleHistoryKeys.begin))); + + // If anything in previousGranules, need to do the handoff logic and set ret.previousChangeFeedId, + // and the previous durable version will come from the previous granules + if (parentGranulesValue.present()) { + state Standalone> parentGranules = + decodeBlobGranuleHistoryValue(parentGranulesValue.get()); + // TODO REMOVE + if (BW_DEBUG) { + printf("Decoded parent granules for [%s - %s)\n", + req.keyRange.begin.printable().c_str(), + req.keyRange.end.printable().c_str()); + for (auto& pg : parentGranules) { + printf(" [%s - %s)\n", pg.begin.printable().c_str(), pg.end.printable().c_str()); + } + } + + // TODO change this for merge + ASSERT(parentGranules.size() == 1); + + state std::pair granuleSplitState; + if (hasPrevOwner) { + std::pair _st = + wait(getGranuleSplitState(&tr, parentGranules[0], req.keyRange)); + granuleSplitState = _st; + } else { + granuleSplitState = std::pair(BlobGranuleSplitState::Started, invalidVersion); + } + + ASSERT(!hasPrevOwner || granuleSplitState.first > BlobGranuleSplitState::Started); + + // if granule wasn't done with old change feed, load it + if (granuleSplitState.first < BlobGranuleSplitState::Done) { + Optional prevGranuleLockValue = wait(tr.get(granuleLockKey(parentGranules[0]))); + ASSERT(prevGranuleLockValue.present()); + std::tuple prevGranuleLock = + decodeBlobGranuleLockValue(prevGranuleLockValue.get()); + info.prevChangeFeedId = std::get<2>(prevGranuleLock); + info.granuleSplitFrom = parentGranules[0]; + + if (granuleSplitState.first == BlobGranuleSplitState::Assigned) { + // was already assigned, use change feed start version + ASSERT(granuleSplitState.second != invalidVersion); + info.changeFeedStartVersion = granuleSplitState.second; + } else if (granuleSplitState.first == BlobGranuleSplitState::Started) { + wait(updateGranuleSplitState(&tr, + parentGranules[0], + req.keyRange, + info.prevChangeFeedId.get(), + BlobGranuleSplitState::Assigned)); + // change feed was created as part of this transaction, changeFeedStartVersion will be + // set later + } else { + ASSERT(false); + } + } + + if (info.doSnapshot) { + // only need to do snapshot if no files exist yet for this granule. + ASSERT(info.previousDurableVersion == invalidVersion); + // FIXME: store this somewhere useful for time travel reads + GranuleFiles prevFiles = wait(loadPreviousFiles(&tr, parentGranules[0])); + ASSERT(!prevFiles.snapshotFiles.empty() || !prevFiles.deltaFiles.empty()); + + info.blobFilesToSnapshot = prevFiles; + info.previousDurableVersion = info.blobFilesToSnapshot.get().deltaFiles.empty() + ? info.blobFilesToSnapshot.get().snapshotFiles.back().version + : info.blobFilesToSnapshot.get().deltaFiles.back().version; + } + } + + wait(tr.commit()); + + if (info.changeFeedStartVersion == invalidVersion) { + info.changeFeedStartVersion = tr.getCommittedVersion(); + } else { + ASSERT(info.changeFeedStartVersion != invalidVersion); + } + + TraceEvent("BlobWorkerPersistedAssignment", bwData->id).detail("Granule", req.keyRange); + + return info; + } catch (Error& e) { + if (e.code() == error_code_granule_assignment_conflict) { + throw e; + } + wait(tr.onError(e)); } - } catch (Error& e) { - printf("ERROR IN PERSIST: %s\n", e.name()); - throw; } } -// try to use GranuleMetadata ACTOR Future start(Reference bwData, GranuleRangeMetadata* meta, AssignBlobRangeRequest req) { ASSERT(meta->activeMetadata.isValid()); meta->activeMetadata->originalReq = req; meta->assignFuture = persistAssignWorkerRange(bwData, req); meta->fileUpdaterFuture = blobGranuleUpdateFiles(bwData, meta->activeMetadata, meta->assignFuture); - // bwData->actors.add(meta->fileUpdaterFuture); wait(success(meta->assignFuture)); return Void(); } @@ -2128,7 +2032,6 @@ ACTOR Future changeBlobRange(Reference bwData, bool active, bool disposeOnCleanup, bool selfReassign) { - printf("changeBlobRange called\n"); if (BW_DEBUG) { printf("%s range for [%s - %s): %s @ (%lld, %lld)\n", selfReassign ? "Re-assigning" : "Changing", @@ -2154,7 +2057,9 @@ ACTOR Future changeBlobRange(Reference bwData, for (auto& r : ranges) { if (!active) { if (r.value().activeMetadata.isValid() && r.value().activeMetadata->cancelled.canBeSet()) { - printf("Cancelling activeMetadata\n"); + if (BW_DEBUG) { + printf("Cancelling activeMetadata\n"); + } r.value().activeMetadata->cancelled.send(Void()); } } @@ -2201,7 +2106,7 @@ ACTOR Future changeBlobRange(Reference bwData, GranuleRangeMetadata newMetadata = (active && newerRanges.empty()) ? constructActiveBlobRange(bwData, keyRange, epoch, seqno) : constructInactiveBlobRange(epoch, seqno); - newMetadata.id = 42; + bwData->granuleMetadata.insert(keyRange, newMetadata); if (BW_DEBUG) { printf("Inserting new range [%s - %s): %s @ (%lld, %lld)\n", @@ -2304,38 +2209,15 @@ ACTOR Future handleRangeAssign(Reference bwData, if (shouldStart) { auto m = bwData->granuleMetadata.rangeContaining(req.keyRange.begin); ASSERT(m.begin() == req.keyRange.begin && m.end() == req.keyRange.end); - printf("About to start for BW %s\n", bwData->id.toString().c_str()); wait(start(bwData, &m.value(), req)); - /* - int count = 0; - // GranuleRangeMetadata& x; - for (auto& it : m) { - printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str()); - wait(start(bwData, &it.value(), req)); - printf("done waiting in handleRangeAssign\n"); - count++; - } - ASSERT(count == 1); - // x.id = 42; - - printf("BW %s ABOUT TO WAIT IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str()); - // WAITING ON START BUT ITS NOT AN ACTOR!!!!!!! SO WHEN handlerangeassign gets operation_cancelled, - // it won't get propogated to start - // wait(start(bwData, x, req)); - printf("done waiting in handleRangeAssign\n"); - */ } } if (!isSelfReassign) { ASSERT(!req.reply.isSet()); - printf("about to send reply\n"); req.reply.send(AssignBlobRangeReply(true)); - printf("done sending reply\n"); } return Void(); } catch (Error& e) { - printf("BW %s GOT ERROR %s IN HANDLERANGEASSIGN\n", bwData->id.toString().c_str(), e.name()); - state Error eState = e; if (BW_DEBUG) { printf("AssignRange [%s - %s) got error %s\n", req.keyRange.begin.printable().c_str(), @@ -2343,25 +2225,19 @@ ACTOR Future handleRangeAssign(Reference bwData, e.name()); } - // - // if (futureAndNewGranule.get().second.isValid()) { - // wait(futureAndNewGranule.get().second->cancel(false)); - //} - // - if (!isSelfReassign) { - if (canReplyWith(eState)) { - req.reply.sendError(eState); + if (canReplyWith(e)) { + req.reply.sendError(e); } } - throw eState; + throw; } } ACTOR Future handleRangeRevoke(Reference bwData, RevokeBlobRangeRequest req) { try { - bool _ = + bool _shouldStart = wait(changeBlobRange(bwData, req.keyRange, req.managerEpoch, req.managerSeqno, false, req.dispose, false)); req.reply.send(AssignBlobRangeReply(true)); return Void(); @@ -2466,8 +2342,8 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, rep.interf = bwInterf; recruitReply.send(rep); - self->actors.add(waitFailureServer(bwInterf.waitFailure.getFuture())); - self->actors.add(runCommitVersionChecks(self)); + self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture())); + self->addActor.send(runCommitVersionChecks(self)); try { loop choose { @@ -2477,7 +2353,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, req.keyRange.end.printable().c_str());*/ ++self->stats.readRequests; ++self->stats.activeReadRequests; - self->actors.add(handleBlobGranuleFileRequest(self, req)); + self->addActor.send(handleBlobGranuleFileRequest(self, req)); } when(state GranuleStatusStreamRequest req = waitNext(bwInterf.granuleStatusStreamRequest.getFuture())) { if (self->managerEpochOk(req.managerEpoch)) { @@ -2505,7 +2381,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, } if (self->managerEpochOk(assignReq.managerEpoch)) { - self->actors.add(handleRangeAssign(self, assignReq, false)); + self->addActor.send(handleRangeAssign(self, assignReq, false)); } else { assignReq.reply.send(AssignBlobRangeReply(false)); } @@ -2524,13 +2400,13 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, } if (self->managerEpochOk(revokeReq.managerEpoch)) { - self->actors.add(handleRangeRevoke(self, revokeReq)); + self->addActor.send(handleRangeRevoke(self, revokeReq)); } else { revokeReq.reply.send(AssignBlobRangeReply(false)); } } when(AssignBlobRangeRequest granuleToReassign = waitNext(self->granuleUpdateErrors.getFuture())) { - self->actors.add(handleRangeAssign(self, granuleToReassign, true)); + self->addActor.send(handleRangeAssign(self, granuleToReassign, true)); } when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) { req.reply.send(Void()); @@ -2540,25 +2416,19 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, break; } } - // when(wait(delay(10))) { throw granule_assignment_conflict(); } when(wait(collection)) { - if (BW_DEBUG) { - printf("BW actor collection returned, exiting\n"); - } + TraceEvent("BlobWorkerActorCollectionError"); ASSERT(false); - throw granule_assignment_conflict(); + throw internal_error(); } } } catch (Error& e) { if (BW_DEBUG) { - printf("Blob worker got error %s, exiting\n", e.name()); + printf("Blob worker got error %s. Exiting...\n", e.name()); } TraceEvent("BlobWorkerDied", self->id).error(e, true); } - printf("cancelling actors for BW %s\n", self->id.toString().c_str()); - self->actors.clear(false); - // self->dead = true; return Void(); }