Add debugging.
This commit is contained in:
parent
2ccb3a4740
commit
310d990b12
|
@ -7831,6 +7831,276 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio
|
|||
return popChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
|
||||
}
|
||||
|
||||
#define BG_REQUEST_DEBUG true
|
||||
|
||||
ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db,
|
||||
PromiseStream<KeyRange> results,
|
||||
KeyRange keyRange) {
|
||||
// FIXME: use streaming range read
|
||||
state Database cx(db);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state KeyRange currentRange = keyRange;
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("Getting Blob Granules for [%s - %s)\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state RangeResult blobGranuleMapping = wait(krmGetRanges(
|
||||
tr, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
|
||||
for (int i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
if (blobGranuleMapping[i].value.size()) {
|
||||
results.send(KeyRangeRef(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key));
|
||||
}
|
||||
}
|
||||
if (blobGranuleMapping.more) {
|
||||
currentRange = KeyRangeRef(blobGranuleMapping.back().key, currentRange.end);
|
||||
} else {
|
||||
results.sendError(end_of_stream());
|
||||
return Void();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range) {
|
||||
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
return getBlobGranuleRangesStreamActor(Reference<DatabaseContext>::addRef(this), results, range);
|
||||
}
|
||||
|
||||
// hack (for now) to get blob worker interface into load balance
|
||||
struct BWLocationInfo : MultiInterface<ReferencedInterface<BlobWorkerInterface>> {
|
||||
using Locations = MultiInterface<ReferencedInterface<BlobWorkerInterface>>;
|
||||
explicit BWLocationInfo(const std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>>& v) : Locations(v) {}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db,
|
||||
PromiseStream<Standalone<BlobGranuleChunkRef>> results,
|
||||
KeyRange range,
|
||||
Version begin,
|
||||
Optional<Version> end) { // end not present is just latest
|
||||
state Database cx(db);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state RangeResult blobGranuleMapping;
|
||||
state Version endVersion;
|
||||
state Key granuleStartKey;
|
||||
state Key granuleEndKey;
|
||||
state KeyRange keyRange = range;
|
||||
state int i, loopCounter = 0;
|
||||
state UID workerId;
|
||||
loop {
|
||||
try {
|
||||
// FIXME: Use streaming parallelism?
|
||||
// Read mapping and worker interfaces from DB
|
||||
loopCounter++;
|
||||
loop {
|
||||
try {
|
||||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
if (loopCounter == 1) {
|
||||
// if retrying, use new version for mapping but original version for read version
|
||||
if (end.present()) {
|
||||
endVersion = end.get();
|
||||
} else {
|
||||
Version _end = wait(tr->getReadVersion());
|
||||
endVersion = _end;
|
||||
}
|
||||
}
|
||||
|
||||
// Right now just read whole blob range assignments from DB
|
||||
// FIXME: eventually we probably want to cache this and invalidate similarly to storage servers.
|
||||
// Cache misses could still read from the DB, or we could add it to the Transaction State Store and
|
||||
// have proxies serve it from memory.
|
||||
RangeResult _bgMapping = wait(krmGetRanges(
|
||||
tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
blobGranuleMapping = _bgMapping;
|
||||
if (blobGranuleMapping.more) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
// printf("BG Mapping for [%s - %s) too large!\n");
|
||||
}
|
||||
throw unsupported_operation();
|
||||
}
|
||||
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
if (blobGranuleMapping.size() == 0) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("no blob worker assignments yet \n");
|
||||
}
|
||||
throw transaction_too_old();
|
||||
}
|
||||
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
fmt::print("Doing blob granule request @ {}\n", endVersion);
|
||||
fmt::print("blob worker assignments:\n");
|
||||
}
|
||||
|
||||
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
granuleStartKey = blobGranuleMapping[i].key;
|
||||
granuleEndKey = blobGranuleMapping[i + 1].key;
|
||||
if (!blobGranuleMapping[i].value.size()) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("Key range [%s - %s) missing worker assignment!\n",
|
||||
granuleStartKey.printable().c_str(),
|
||||
granuleEndKey.printable().c_str());
|
||||
// TODO probably new exception type instead
|
||||
}
|
||||
throw transaction_too_old();
|
||||
}
|
||||
|
||||
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf(" [%s - %s): %s\n",
|
||||
granuleStartKey.printable().c_str(),
|
||||
granuleEndKey.printable().c_str(),
|
||||
workerId.toString().c_str());
|
||||
}
|
||||
|
||||
if (!cx->blobWorker_interf.count(workerId)) {
|
||||
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
|
||||
// from the time the mapping was read from the db, the associated blob worker
|
||||
// could have died and so its interface wouldn't be present as part of the blobWorkerList
|
||||
// we persist in the db. So throw wrong_shard_server to get the new mapping
|
||||
if (!workerInterface.present()) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get());
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Make request for each granule
|
||||
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
granuleStartKey = blobGranuleMapping[i].key;
|
||||
granuleEndKey = blobGranuleMapping[i + 1].key;
|
||||
// if this was a time travel and the request returned larger bounds, skip this chunk
|
||||
if (granuleEndKey <= keyRange.begin) {
|
||||
continue;
|
||||
}
|
||||
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
|
||||
// prune first/last granules to requested range
|
||||
if (keyRange.begin > granuleStartKey) {
|
||||
granuleStartKey = keyRange.begin;
|
||||
}
|
||||
if (keyRange.end < granuleEndKey) {
|
||||
granuleEndKey = keyRange.end;
|
||||
}
|
||||
|
||||
state BlobGranuleFileRequest req;
|
||||
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
|
||||
req.beginVersion = begin;
|
||||
req.readVersion = endVersion;
|
||||
|
||||
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
|
||||
v.push_back(makeReference<ReferencedInterface<BlobWorkerInterface>>(cx->blobWorker_interf[workerId]));
|
||||
state Reference<MultiInterface<ReferencedInterface<BlobWorkerInterface>>> location =
|
||||
makeReference<BWLocationInfo>(v);
|
||||
// use load balance with one option for now for retry and error handling
|
||||
BlobGranuleFileReply rep = wait(loadBalance(location,
|
||||
&BlobWorkerInterface::blobGranuleFileRequest,
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::False,
|
||||
nullptr));
|
||||
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
fmt::print("Blob granule request for [{0} - {1}) @ {2} - {3} got reply from {4}:\n",
|
||||
granuleStartKey.printable(),
|
||||
granuleEndKey.printable(),
|
||||
begin,
|
||||
endVersion,
|
||||
workerId.toString());
|
||||
}
|
||||
for (auto& chunk : rep.chunks) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("[%s - %s)\n",
|
||||
chunk.keyRange.begin.printable().c_str(),
|
||||
chunk.keyRange.end.printable().c_str());
|
||||
|
||||
printf(" SnapshotFile:\n %s\n",
|
||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
||||
printf(" DeltaFiles:\n");
|
||||
for (auto& df : chunk.deltaFiles) {
|
||||
printf(" %s\n", df.toString().c_str());
|
||||
}
|
||||
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
||||
if (chunk.newDeltas.size() > 0) {
|
||||
fmt::print(" with version [{0} - {1}]",
|
||||
chunk.newDeltas[0].version,
|
||||
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
||||
}
|
||||
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
||||
printf("\n\n");
|
||||
}
|
||||
Arena a;
|
||||
a.dependsOn(rep.arena);
|
||||
results.send(Standalone<BlobGranuleChunkRef>(chunk, a));
|
||||
keyRange = KeyRangeRef(chunk.keyRange.end, keyRange.end);
|
||||
}
|
||||
}
|
||||
results.sendError(end_of_stream());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
||||
e.code() == error_code_connection_failed) {
|
||||
// TODO would invalidate mapping cache here if we had it
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
|
||||
} else {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("blob granule file request got unexpected error %s\n", e.name());
|
||||
}
|
||||
results.sendError(e);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results,
|
||||
KeyRange range,
|
||||
Version begin,
|
||||
Optional<Version> end) {
|
||||
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
return readBlobGranulesStreamActor(Reference<DatabaseContext>::addRef(this), results, range, begin, end);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
if (lockAware) {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
}
|
||||
|
||||
tr.set(perpetualStorageWiggleKey, enable ? "1"_sr : "0"_sr);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
|
||||
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
|
||||
}
|
||||
|
|
|
@ -786,7 +786,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( randomize && BUGGIFY ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; }
|
||||
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
|
||||
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
|
||||
init( BG_PRUNE_TIMEOUT, 60 * 60);
|
||||
init( BG_PRUNE_TIMEOUT, 60*60);
|
||||
|
||||
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
|
||||
init( BLOB_WORKERLIST_FETCH_INTERVAL, 1.0 );
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <sstream>
|
||||
#include <queue>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
@ -1412,30 +1413,49 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granule
|
|||
* also removes the history entry for this granule from the system keyspace
|
||||
*/
|
||||
ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyRef historyKey) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Fully deleting granule %s: init\n", granuleId.toString().c_str());
|
||||
}
|
||||
|
||||
state Transaction tr(self->db);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
KeyRange filesRange = blobGranuleFileKeyRangeFor(granuleId);
|
||||
|
||||
// get files
|
||||
GranuleFiles files = wait(loadHistoryFiles(&tr, granuleId, BM_DEBUG));
|
||||
|
||||
std::vector<Future<Void>> deletions;
|
||||
std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
|
||||
|
||||
for (auto snapshotFile : files.snapshotFiles) {
|
||||
std::string fname = snapshotFile.filename;
|
||||
deletions.emplace_back(self->bstore->deleteFile(fname));
|
||||
filesToDelete.emplace_back(fname);
|
||||
}
|
||||
|
||||
for (auto deltaFile : files.deltaFiles) {
|
||||
std::string fname = deltaFile.filename;
|
||||
deletions.emplace_back(self->bstore->deleteFile(fname));
|
||||
filesToDelete.emplace_back(fname);
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Fully deleting granule %s: deleting %d files\n", granuleId.toString().c_str(), deletions.size());
|
||||
for (auto filename : filesToDelete) {
|
||||
printf(" - %s\n", filename.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// delete the files before the corresponding metadata.
|
||||
// this could lead to dangling pointers in fdb, but this granule should
|
||||
// never be read again anyways, and we can clean up the keys the next time around.
|
||||
// deleting files before corresponding metadata reduces the # of orphaned files.
|
||||
wait(waitForAll(deletions));
|
||||
|
||||
// delete metadata in FDB (history entry and file keys)
|
||||
if (BM_DEBUG) {
|
||||
printf("Fully deleting granule %s: deleting history and file keys\n", granuleId.toString().c_str());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
KeyRange fileRangeKey = blobGranuleFileKeyRangeFor(granuleId);
|
||||
|
@ -1448,6 +1468,10 @@ ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyR
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Fully deleting granule %s: success\n", granuleId.toString().c_str());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1456,51 +1480,88 @@ ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyR
|
|||
* version <= pruneVersion and deletes all files older than it.
|
||||
*/
|
||||
ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId, Version pruneVersion) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Partially deleting granule %s: init\n", granuleId.toString().c_str());
|
||||
}
|
||||
|
||||
state Transaction tr(self->db);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
KeyRange filesRange = blobGranuleFileKeyRangeFor(granuleId);
|
||||
|
||||
// get files
|
||||
GranuleFiles files = wait(loadHistoryFiles(&tr, granuleId, BM_DEBUG));
|
||||
|
||||
Version latestSnaphotVersion = invalidVersion;
|
||||
// represents the version of the latest snapshot file in this granule with G.version < pruneVersion
|
||||
Version latestSnapshotVersion = invalidVersion;
|
||||
|
||||
state std::vector<Future<Void>> deletions;
|
||||
state std::vector<KeyRef> deletedFileKeys;
|
||||
state std::vector<Future<Void>> deletions; // deletion work per file
|
||||
state std::vector<Key> deletedFileKeys; // keys for deleted files
|
||||
state std::vector<std::string> filesToDelete; // TODO: remove evenutally, just for debugging
|
||||
|
||||
// TODO: binary search these snapshot files for latestSnapshotVersion
|
||||
for (int idx = files.snapshotFiles.size() - 1; idx >= 0; --idx) {
|
||||
// if we already found the latestSnapshotVersion, this snapshot can be deleted
|
||||
if (latestSnaphotVersion != invalidVersion) {
|
||||
if (latestSnapshotVersion != invalidVersion) {
|
||||
std::string fname = files.snapshotFiles[idx].filename;
|
||||
deletions.emplace_back(self->bstore->deleteFile(fname));
|
||||
deletedFileKeys.emplace_back(blobGranuleFileKeyFor(granuleId, 'S', files.snapshotFiles[idx].version));
|
||||
filesToDelete.emplace_back(fname);
|
||||
} else if (files.snapshotFiles[idx].version <= pruneVersion) {
|
||||
// otherwise if this is the FIRST snapshot file with version < pruneVersion,
|
||||
// then we found our latestSnapshotVersion (FIRST since we are traversing in reverse)
|
||||
latestSnaphotVersion = files.snapshotFiles[idx].version;
|
||||
latestSnapshotVersion = files.snapshotFiles[idx].version;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(latestSnaphotVersion != invalidVersion);
|
||||
// we would have only partially deleted the granule if such a snapshot existed
|
||||
ASSERT(latestSnapshotVersion != invalidVersion);
|
||||
|
||||
// delete all delta files older than latestSnapshotVersion
|
||||
for (auto deltaFile : files.deltaFiles) {
|
||||
if (deltaFile.version < latestSnaphotVersion) {
|
||||
std::string fname = deltaFile.filename;
|
||||
deletions.emplace_back(self->bstore->deleteFile(fname));
|
||||
deletedFileKeys.emplace_back(blobGranuleFileKeyFor(granuleId, 'D', deltaFile.version));
|
||||
// traversing in fwd direction, so stop once we find the first delta file past the latestSnapshotVersion
|
||||
if (deltaFile.version > latestSnapshotVersion) {
|
||||
break;
|
||||
}
|
||||
|
||||
// otherwise deltaFile.version <= latestSnapshotVersion so delete it
|
||||
// == should also be deleted because the last delta file before a snapshot would have the same version
|
||||
std::string fname = deltaFile.filename;
|
||||
deletions.emplace_back(self->bstore->deleteFile(fname));
|
||||
deletedFileKeys.emplace_back(blobGranuleFileKeyFor(granuleId, 'D', deltaFile.version));
|
||||
filesToDelete.emplace_back(fname);
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Partially deleting granule %s: deleting %d files\n", granuleId.toString().c_str(), deletions.size());
|
||||
for (auto filename : filesToDelete) {
|
||||
printf(" - %s\n", filename.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
printf("partial deletion: deleting %d files\n", deletions.size());
|
||||
// TODO: the following comment relies on the assumption that BWs will not get requests to
|
||||
// read data that was already pruned. confirm assumption is fine. otherwise, we'd need
|
||||
// to communicate with BWs here and have them ack the pruneVersion
|
||||
|
||||
// delete the files before the corresponding metadata.
|
||||
// this could lead to dangling pointers in fdb, but we should never read data older than
|
||||
// pruneVersion anyways, and we can clean up the keys the next time around.
|
||||
// deleting files before corresponding metadata reduces the # of orphaned files.
|
||||
wait(waitForAll(deletions));
|
||||
|
||||
// delete metadata in FDB (deleted file keys)
|
||||
// TODO: do we need to also update the start version for the history entry?
|
||||
// it would be a blind write here so might that cause a problem with history traversal in BW?
|
||||
// do we gain any benefit from updating it? even if we keep the old start version, the worst is
|
||||
// someone requests a version in [oldStartVersion, newStartVersion) and we fail to return
|
||||
// any files for that request.
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Partially deleting granule %s: deleting file keys\n", granuleId.toString().c_str());
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
for (auto key : deletedFileKeys) {
|
||||
for (auto& key : deletedFileKeys) {
|
||||
tr.clear(key);
|
||||
}
|
||||
wait(tr.commit());
|
||||
|
@ -1509,6 +1570,10 @@ ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId,
|
|||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Partially deleting granule %s: success\n", granuleId.toString().c_str());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1519,11 +1584,15 @@ ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId,
|
|||
* and nodes that can be partially deleted (i.e. some of their files can be deleted).
|
||||
* Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done
|
||||
* processing this prune intent.
|
||||
*
|
||||
* TODO: communicate the prune to blob workers so they can clean up local memory
|
||||
* maybe BWs can just watch the prune keys as well!
|
||||
*/
|
||||
ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef endKey, Version pruneVersion, bool force) {
|
||||
if (BM_DEBUG) {
|
||||
printf("pruneRange starting for range [%s-%s) @ pruneVersion=%lld, force=%s\n",
|
||||
startKey.printable().c_str(),
|
||||
endKey.printable().c_str(),
|
||||
pruneVersion);
|
||||
}
|
||||
|
||||
// queue of <range, startVersion, endVersion> for BFS traversal of history
|
||||
// TODO: consider using GranuleHistoryEntry, but that also makes it a little messy
|
||||
state std::queue<std::tuple<KeyRange, Version, Version>> historyEntryQueue;
|
||||
|
@ -1546,28 +1615,35 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
|
||||
state KeyRangeMap<UID>::iterator activeRange;
|
||||
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
|
||||
// assumption: prune boundaries must respect granule boundaries
|
||||
printf("looping over active range [%s-%s)=%s\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str(),
|
||||
activeRange.value().toString().c_str());
|
||||
if (BM_DEBUG) {
|
||||
printf("Checking if active range [%s-%s), owned by BW %s, should be pruned\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str(),
|
||||
activeRange.value().toString().c_str());
|
||||
}
|
||||
|
||||
// ASSERT(activeRange.begin() >= startKey && activeRange.end() < endKey);
|
||||
// assumption: prune boundaries must respect granule boundaries
|
||||
if (activeRange.begin() < startKey || activeRange.end() > endKey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: if this is a force prune, then revoke the assignment from the corresponding BW first
|
||||
// so that it doesn't try to interact with the granule (i.e. force it to give up gLock).
|
||||
// we'll need some way to ack that the revoke was successful
|
||||
|
||||
loop {
|
||||
try {
|
||||
printf("fetching latest history for worker assignment [%s-%s)=%s\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str(),
|
||||
activeRange.value().toString().c_str());
|
||||
if (BM_DEBUG) {
|
||||
printf("Fetching latest history entry for range [%s-%s)\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str())
|
||||
}
|
||||
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
||||
// ASSERT(history.present());
|
||||
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a get
|
||||
if (history.present()) {
|
||||
printf("pushing onto history queue\n");
|
||||
if (BM_DEBUG) {
|
||||
printf("Adding range to history queue\n");
|
||||
}
|
||||
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
||||
}
|
||||
break;
|
||||
|
@ -1577,7 +1653,9 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
}
|
||||
}
|
||||
|
||||
printf("starting to go through history queue\n");
|
||||
if (BM_DEBUG) {
|
||||
printf("Beginning BFS traversal of history\n");
|
||||
}
|
||||
while (!historyEntryQueue.empty()) {
|
||||
// process the node at the front of the queue and remove it
|
||||
KeyRange currRange;
|
||||
|
@ -1586,6 +1664,14 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
std::tie(currRange, startVersion, endVersion) = historyEntryQueue.front();
|
||||
historyEntryQueue.pop();
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Processing history node [%s-%s) with versions [%lld, %lld)\n",
|
||||
currRange.begin.printable().c_str(),
|
||||
currRange.end.printable().c_str(),
|
||||
startVersion,
|
||||
endVersion);
|
||||
}
|
||||
|
||||
// get the persisted history entry for this granule
|
||||
state Standalone<BlobGranuleHistoryValue> currHistoryNode;
|
||||
state KeyRef historyKey = blobGranuleHistoryKeyFor(currRange, startVersion);
|
||||
|
@ -1600,8 +1686,16 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Found history entry for this node. It's granuleID is %s\n",
|
||||
currHistoryNode.granuleID.toString().c_str());
|
||||
}
|
||||
|
||||
// if we already saw this node, skip it; otherwise, mark it as visited
|
||||
if (visited.count(currHistoryNode.granuleID)) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Already processed %s, so skipping it\n", currHistoryNode.granuleID.toString().c_str());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
visited.insert(currHistoryNode.granuleID);
|
||||
|
@ -1609,18 +1703,32 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
// There are three cases this granule can fall into:
|
||||
// - if the granule's end version is at or before the prune version or this is a force delete,
|
||||
// this granule should be completely deleted
|
||||
// - else if the startVersion <= pruneVersion, then G.startVersion <= pruneVersion < G.endVersion
|
||||
// - else if the startVersion <= pruneVersion, then G.startVersion < pruneVersion < G.endVersion
|
||||
// and so this granule should be partially deleted
|
||||
// - otherwise, this granule is active, so don't schedule it for deletion
|
||||
if (force || endVersion <= pruneVersion) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Granule %s will be FULLY deleted\n", currHistoryNode.granuleID.toString().c_str());
|
||||
}
|
||||
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey });
|
||||
} else if (startVersion <= pruneVersion) {
|
||||
} else if (startVersion < pruneVersion) {
|
||||
if (BM_DEBUG) {
|
||||
printf("Granule %s will be partially deleted\n", currHistoryNode.granuleID.toString().c_str());
|
||||
}
|
||||
toPartiallyDelete.push_back({ currHistoryNode.granuleID });
|
||||
}
|
||||
|
||||
// add all of the node's parents to the queue
|
||||
for (auto& parent : currHistoryNode.parentGranules) {
|
||||
// the parent's end version is this node's startVersion
|
||||
// the parent's end version is this node's startVersion,
|
||||
// since this node must have started where it's parent finished
|
||||
if (BM_DEBUG) {
|
||||
printf("Adding parent [%s-%s) with versions [%lld-%lld) to queue\n",
|
||||
parent.first.begin.printable().c_str(),
|
||||
parent.first.end.printable().c_str(),
|
||||
parent.second,
|
||||
startVersion);
|
||||
}
|
||||
historyEntryQueue.push({ parent.first, parent.second, startVersion });
|
||||
}
|
||||
}
|
||||
|
@ -1632,23 +1740,34 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
// any node that must be partially deleted must occur later on in the history. Thus,
|
||||
// we delete the 'toFullyDelete' granules first.
|
||||
//
|
||||
// Unfortunately we can't do multiple deletions in parallel because they might
|
||||
// race and we'll end up with unreachable nodes in the case of a crash
|
||||
// Unfortunately we can't do parallelize _full_ deletions because they might
|
||||
// race and we'll end up with unreachable nodes in the case of a crash.
|
||||
// Since partial deletions only occur for "leafs", they can be done in parallel
|
||||
|
||||
state int i;
|
||||
printf("%d granules to fully delete\n", toFullyDelete.size());
|
||||
if (BM_DEBUG) {
|
||||
printf("%d granules to fully delete\n", toFullyDelete.size());
|
||||
}
|
||||
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
|
||||
UID granuleId;
|
||||
KeyRef historyKey;
|
||||
std::tie(granuleId, historyKey) = toFullyDelete[i];
|
||||
// FIXME: consider batching into a single txn (need to take care of txn size limit)
|
||||
if (BM_DEBUG) {
|
||||
printf("About to fully delete granule %s\n", granuleId.toString().c_str());
|
||||
}
|
||||
wait(fullyDeleteGranule(self, granuleId, historyKey));
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("%d granules to partially delete\n", toPartiallyDelete.size());
|
||||
}
|
||||
std::vector<Future<Void>> partialDeletions;
|
||||
printf("%d granules to partially delete\n", toPartiallyDelete.size());
|
||||
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
|
||||
UID granuleId = toPartiallyDelete[i];
|
||||
if (BM_DEBUG) {
|
||||
printf("About to partially delete granule %s\n", granuleId.toString().c_str());
|
||||
}
|
||||
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, pruneVersion));
|
||||
}
|
||||
|
||||
|
@ -1660,6 +1779,9 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
|
||||
|
||||
tr.reset();
|
||||
if (BM_DEBUG) {
|
||||
printf("About to clear prune intent\n");
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
@ -1679,21 +1801,20 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
printf("pruneRange got error %s\n", e.name());
|
||||
printf("Attempt to clear prune intent got error %s\n", e.name());
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("Successfully pruned range [%s-%s) at pruneVersion=%lld\n",
|
||||
startKey.printable().c_str(),
|
||||
endKey.printable().c_str(),
|
||||
pruneVersion);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
/*
|
||||
TODO: We need to revoke range from BW so that it doesn't try to add to a granule that we dropped
|
||||
Will SnowTram reuse table IDs; do we unhybridize a range once it's been revoked/dropped?
|
||||
|
||||
Or can we possibly make the BW give it up upon seeing a change in the watch?
|
||||
*/
|
||||
|
||||
/*
|
||||
* This monitor watches for changes to a key K that gets updated whenever there is a new prune intent.
|
||||
* On this change, we scan through all blobGranulePruneKeys (which look like <startKey, endKey>=<prune_version,
|
||||
|
@ -1731,26 +1852,61 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey));
|
||||
state Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey));
|
||||
|
||||
// if the value at the change key has changed, that means there is new work to do
|
||||
if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) {
|
||||
oldPruneWatchVal = newPruneWatchVal.get();
|
||||
printf("old and new watch don't match\n");
|
||||
if (BM_DEBUG) {
|
||||
printf("the blobGranulePruneChangeKey changed\n");
|
||||
}
|
||||
|
||||
// TODO: debugging code, remove it
|
||||
if (newPruneWatchVal.get().toString().substr(0, 6) == "random") {
|
||||
state Reference<ReadYourWritesTransaction> dummy =
|
||||
makeReference<ReadYourWritesTransaction>(self->db);
|
||||
loop {
|
||||
try {
|
||||
dummy->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
dummy->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
std::istringstream iss(newPruneWatchVal.get().toString().substr(6));
|
||||
Version version;
|
||||
iss >> version;
|
||||
dummy->set(blobGranulePruneKeys.begin.withSuffix(normalKeys.begin),
|
||||
blobGranulePruneValueFor(version, false));
|
||||
wait(dummy->commit());
|
||||
break;
|
||||
|
||||
} catch (Error& e) {
|
||||
wait(dummy->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// otherwise, there are no changes and we should wait until the next change (or timeout)
|
||||
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
|
||||
wait(tr->commit());
|
||||
|
||||
if (BM_DEBUG) {
|
||||
printf("monitorPruneKeys waiting for change or timeout\n");
|
||||
}
|
||||
|
||||
choose {
|
||||
when(wait(watchPruneIntentsChange)) { tr->reset(); }
|
||||
when(wait(watchPruneIntentsChange)) {
|
||||
if (BM_DEBUG) {
|
||||
printf("monitorPruneKeys saw a change\n");
|
||||
}
|
||||
tr->reset();
|
||||
}
|
||||
when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) {
|
||||
printf("bg prune timeouts\n");
|
||||
if (BM_DEBUG) {
|
||||
printf("monitorPruneKeys got a timeout\n");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
// wait(timeout(watchPruneIntentsChange, SERVER_KNOBS->BG_PRUNE_TIMEOUT, Void()));
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
|
@ -1758,23 +1914,10 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
|
||||
tr->reset();
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Version dummyV = wait(tr->getReadVersion());
|
||||
Value dummyValue = blobGranulePruneValueFor(dummyV, false);
|
||||
wait(krmSetRange(tr, blobGranulePruneKeys.begin, normalKeys, dummyValue));
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
printf("dummy txn saw error %s\n", e.name());
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
if (BM_DEBUG) {
|
||||
printf("Looping over prune intents\n");
|
||||
}
|
||||
|
||||
tr->reset();
|
||||
|
||||
// loop through all prune intentions and do prune work accordingly
|
||||
state KeyRef beginKey = normalKeys.begin;
|
||||
loop {
|
||||
|
@ -1802,43 +1945,22 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
bool force;
|
||||
std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
|
||||
|
||||
// TODO: should we add this to an actor collection or a list of futures?
|
||||
// Probably because still need to handle the case of one prune at version V and then timer
|
||||
// expires and we start another prune again at version V. we need to keep track of what's in
|
||||
// progress. That brings another problem though: what happens if something is in progress and
|
||||
// fails... One way to prevent this is to not iterate over the prunes until the last iteration
|
||||
// is done (i.e waitForAll)
|
||||
//
|
||||
|
||||
/*
|
||||
auto currPrunes = self->prunesInProgress.intersectingRanges(range);
|
||||
int count = 0;
|
||||
for (auto currPrune : currPrunes) {
|
||||
count++;
|
||||
if (currPrune.value() == pruneVersion) {
|
||||
}
|
||||
}
|
||||
ASSERT(currPrunes.() <= 1);
|
||||
*/
|
||||
|
||||
printf("about to prune range [%s-%s) @ %d, force=%s\n",
|
||||
rangeStartKey.printable().c_str(),
|
||||
rangeEndKey.printable().c_str(),
|
||||
pruneVersion,
|
||||
force ? "T" : "F");
|
||||
prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force));
|
||||
|
||||
// TODO: maybe clear the key here if pruneRange succeeded, but then we'd have to wait here
|
||||
}
|
||||
|
||||
// wait for this set of prunes to complete before starting the next ones since if we prune
|
||||
// a range R at version V and while we are doing that, the time expires, we will end up
|
||||
// trying to prune the same range again since the work isn't finished
|
||||
// trying to prune the same range again since the work isn't finished and the prunes will race
|
||||
//
|
||||
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the BM's
|
||||
// memory that tracks which prunes are active. Once done, we can mark that work as done. If the BM
|
||||
// fails then all prunes will fail and so the next BM will have a clear set of metadata (i.e. no
|
||||
// work in progress) so we will end up doing the work in the new BM
|
||||
// memory that tracks which prunes are active. Once done, we can mark that work as done. If the
|
||||
// BM fails then all prunes will fail and so the next BM will have a clear set of metadata (i.e.
|
||||
// no work in progress) so we will end up doing the work in the new BM
|
||||
wait(waitForAll(prunes));
|
||||
|
||||
if (!pruneIntents.more) {
|
||||
|
@ -1851,7 +1973,9 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
printf("done pruning all ranges. looping back\n");
|
||||
if (BM_DEBUG) {
|
||||
printf("Done pruning current set of prune intents.\n");
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (BM_DEBUG) {
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
#define BW_DEBUG true
|
||||
#define BW_REQUEST_DEBUG false
|
||||
#define BW_REQUEST_DEBUG true
|
||||
|
||||
// represents a previous version of a granule, and optionally the files that compose it
|
||||
struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry> {
|
||||
|
@ -250,7 +250,6 @@ ACTOR Future<Void> readAndCheckGranuleLock(Reference<ReadYourWritesTransaction>
|
|||
return Void();
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
// used for "business logic" of both versions of loading granule files
|
||||
ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, GranuleFiles* files, UID granuleID) {
|
||||
|
||||
|
@ -475,9 +474,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
|
||||
|
||||
// TODO some sort of directory structure would be useful?
|
||||
state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(currentDeltaVersion) +
|
||||
".delta";
|
||||
state std::string fname = granuleID.toString() + "_T" + std::to_string((uint64_t)(1000.0 * now())) + "_V" +
|
||||
std::to_string(currentDeltaVersion) + ".delta";
|
||||
|
||||
state Value serialized = ObjectWriter::toValue(deltasToWrite, Unversioned());
|
||||
|
||||
|
@ -576,8 +574,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
PromiseStream<RangeResult> rows,
|
||||
bool createGranuleHistory) {
|
||||
// TODO some sort of directory structure would be useful maybe?
|
||||
state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) + ".snapshot";
|
||||
state std::string fname = granuleID.toString() + "_T" + std::to_string((uint64_t)(1000.0 * now())) + "_V" +
|
||||
std::to_string(version) + ".snapshot";
|
||||
state Arena arena;
|
||||
state GranuleSnapshot snapshot;
|
||||
|
||||
|
@ -1243,6 +1241,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
|
||||
ASSERT(metadata->readable.canBeSet());
|
||||
metadata->readable.send(Void());
|
||||
printf("Got change feed stream\n");
|
||||
|
||||
loop {
|
||||
// check outstanding snapshot/delta files for completion
|
||||
|
@ -1354,6 +1353,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
metadata->keyRange);
|
||||
}
|
||||
|
||||
printf("About to process mutations\n");
|
||||
|
||||
// process mutations
|
||||
if (!mutations.empty()) {
|
||||
bool processedAnyMutations = false;
|
||||
|
@ -1952,6 +1953,11 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
|
|||
}
|
||||
|
||||
ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
|
||||
printf("BW %s got blobGranuleFileRequest for range [%s-%s) @ %lld\n",
|
||||
bwData->id.toString().c_str(),
|
||||
req.keyRange.begin.printable().c_str(),
|
||||
req.keyRange.end.printable().c_str(),
|
||||
req.readVersion);
|
||||
try {
|
||||
// TODO REMOVE in api V2
|
||||
ASSERT(req.beginVersion == 0);
|
||||
|
@ -2011,10 +2017,13 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
|
|||
throw wrong_shard_server();
|
||||
}
|
||||
|
||||
printf("is readable\n");
|
||||
|
||||
state KeyRange chunkRange;
|
||||
state GranuleFiles chunkFiles;
|
||||
|
||||
if (metadata->initialSnapshotVersion > req.readVersion) {
|
||||
printf("time travel query\n");
|
||||
// this is a time travel query, find previous granule
|
||||
if (metadata->historyLoaded.canBeSet()) {
|
||||
choose {
|
||||
|
@ -2092,6 +2101,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
|
|||
ASSERT(chunkFiles.deltaFiles.back().version > req.readVersion);
|
||||
ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion);
|
||||
} else {
|
||||
|
||||
// this is an active granule query
|
||||
loop {
|
||||
if (!metadata->activeCFData.get().isValid()) {
|
||||
|
@ -2739,6 +2749,107 @@ ACTOR Future<Void> monitorRemoval(Reference<BlobWorkerData> bwData) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorPruneKeys(Reference<BlobWorkerData> self) {
|
||||
try {
|
||||
state Value oldPruneWatchVal;
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
// Wait for the watch to change, or some time to expire (whichever comes first)
|
||||
// before checking through the prune intents. We write a UID into the change key value
|
||||
// so that we can still recognize when the watch key has been changed while we weren't
|
||||
// monitoring it
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey));
|
||||
|
||||
// if the value at the change key has changed, that means there is new work to do
|
||||
if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) {
|
||||
oldPruneWatchVal = newPruneWatchVal.get();
|
||||
printf("old and new watch don't match\n");
|
||||
break;
|
||||
}
|
||||
|
||||
// otherwise, there are no changes and we should wait until the next change (or timeout)
|
||||
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
|
||||
wait(tr->commit());
|
||||
printf("About to wait for change or timeout\n");
|
||||
choose {
|
||||
when(wait(watchPruneIntentsChange)) { tr->reset(); }
|
||||
when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) {
|
||||
printf("bg prune timeouts\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
// wait(timeout(watchPruneIntentsChange, SERVER_KNOBS->BG_PRUNE_TIMEOUT, Void()));
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
tr->reset();
|
||||
|
||||
// loop through all prune intentions and do prune work accordingly
|
||||
state KeyRef beginKey = normalKeys.begin;
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
state std::vector<Future<Void>> prunes;
|
||||
try {
|
||||
// TODO: replace 10000 with a knob
|
||||
KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end));
|
||||
state RangeResult pruneIntents = wait(krmGetRanges(
|
||||
tr, blobGranulePruneKeys.begin, nextRange, 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
|
||||
// TODO: would we miss a range [pruneIntents[9999], pruneIntents[10000]) because of the `more`?
|
||||
// Or does `readThrough` take care of this? We also do this in recoverBlobManager
|
||||
printf("pruneIntents.size()==%d\n", pruneIntents.size());
|
||||
for (int rangeIdx = 0; rangeIdx < pruneIntents.size() - 1; ++rangeIdx) {
|
||||
if (pruneIntents[rangeIdx].value.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
KeyRef rangeStartKey = pruneIntents[rangeIdx].key;
|
||||
KeyRef rangeEndKey = pruneIntents[rangeIdx + 1].key;
|
||||
KeyRange range(KeyRangeRef(rangeStartKey, rangeEndKey));
|
||||
Version pruneVersion;
|
||||
bool force;
|
||||
std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
|
||||
|
||||
printf("about to prune range [%s-%s) @ %d, force=%s\n",
|
||||
rangeStartKey.printable().c_str(),
|
||||
rangeEndKey.printable().c_str(),
|
||||
pruneVersion,
|
||||
force ? "T" : "F");
|
||||
|
||||
// TODO: clear associated history
|
||||
}
|
||||
|
||||
if (!pruneIntents.more) {
|
||||
break;
|
||||
}
|
||||
|
||||
beginKey = pruneIntents.readThrough.get();
|
||||
} catch (Error& e) {
|
||||
// TODO: other errors here from pruneRange?
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
printf("done pruning all ranges. looping back\n");
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (BW_DEBUG) {
|
||||
printf("monitorPruneKeys got error %s\n", e.name());
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
||||
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
|
@ -2784,6 +2895,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
|||
recruitReply.send(rep);
|
||||
|
||||
self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
|
||||
self->addActor.send(monitorPruneKeys(self));
|
||||
state Future<Void> selfRemoved = monitorRemoval(self);
|
||||
|
||||
TraceEvent("BlobWorkerInit", self->id);
|
||||
|
|
|
@ -33,8 +33,14 @@
|
|||
// keys in debugKeys and the ranges in debugRanges.
|
||||
// Each entry is a pair of (label, keyOrRange) and the Label will be attached to the
|
||||
// MutationTracking TraceEvent for easier searching/recognition.
|
||||
std::vector<std::pair<const char*, KeyRef>> debugKeys = { { "SomeKey", "foo"_sr } };
|
||||
std::vector<std::pair<const char*, KeyRangeRef>> debugRanges = { { "Everything", { ""_sr, "\xff\xff\xff\xff"_sr } } };
|
||||
std::vector<std::pair<const char*, KeyRef>> debugKeys = {};
|
||||
std::vector<std::pair<const char*, KeyRangeRef>> debugRanges = { { "PruneKeys",
|
||||
{ "\xff"
|
||||
"\x02"
|
||||
"/bgp/"_sr,
|
||||
"\xff"
|
||||
"\x02"
|
||||
"/bgp0"_sr } } };
|
||||
|
||||
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||
const char* label = nullptr;
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
|
||||
#define MUTATION_TRACKING_ENABLED 0
|
||||
#define MUTATION_TRACKING_ENABLED 1
|
||||
// The keys to track are defined in the .cpp file to limit recompilation.
|
||||
|
||||
#define DEBUG_MUTATION(...) MUTATION_TRACKING_ENABLED&& debugMutation(__VA_ARGS__)
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
@ -59,6 +60,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
int64_t timeTravelTooOld = 0;
|
||||
int64_t rowsRead = 0;
|
||||
int64_t bytesRead = 0;
|
||||
KeyRangeMap<Version> latestPruneVersions;
|
||||
std::vector<Future<Void>> clients;
|
||||
|
||||
DatabaseConfiguration config;
|
||||
|
@ -305,11 +307,53 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> pruneAtVersion(Database cx, KeyRange range, Version version, bool force) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> oldPruneIntent = wait(tr->get(blobGranulePruneKeys.begin.withSuffix(range.begin)));
|
||||
|
||||
if (oldPruneIntent.present()) {
|
||||
Version oldPruneVersion;
|
||||
bool oldForce;
|
||||
std::tie(oldPruneVersion, oldForce) = decodeBlobGranulePruneValue(oldPruneIntent.get());
|
||||
if (oldPruneVersion >= version) {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
Value pruneValue = blobGranulePruneValueFor(version, force);
|
||||
wait(krmSetRange(tr, blobGranulePruneKeys.begin, range, pruneValue));
|
||||
tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString());
|
||||
wait(tr->commit());
|
||||
if (BGV_DEBUG) {
|
||||
printf("pruneAtVersion for range [%s-%s) at version %lld succeeded\n",
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str(),
|
||||
version);
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (BGV_DEBUG) {
|
||||
printf("pruneAtVersion for range [%s-%s) at version %lld encountered error %s\n",
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str(),
|
||||
version,
|
||||
e.name());
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) {
|
||||
state double last = now();
|
||||
state double endTime = last + self->testDuration;
|
||||
state std::map<double, OldRead> timeTravelChecks;
|
||||
state int64_t timeTravelChecksMemory = 0;
|
||||
state KeyRangeMap<Version> latestPruneVersions;
|
||||
|
||||
TraceEvent("BlobGranuleVerifierStart");
|
||||
if (BGV_DEBUG) {
|
||||
|
@ -334,10 +378,18 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
// advance iterator before doing read, so if it gets error we don't retry it
|
||||
|
||||
try {
|
||||
// before reading, prune at some version [0, readVersion)
|
||||
Version pruneVersion = deterministicRandom()->randomInt(0, oldRead.v);
|
||||
wait(self->pruneAtVersion(cx, oldRead.range, pruneVersion, false));
|
||||
// FIXME: this doesnt actually guarantee that the prune executed. maybe add a delay?
|
||||
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
|
||||
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
|
||||
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
|
||||
self->timeTravelReads++;
|
||||
|
||||
// TODO: read at some version older than pruneVersion and make sure you get txn_too_old
|
||||
// To achieve this, the BWs are going to have to recognize latest prune versions per granules
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
self->timeTravelTooOld++;
|
||||
|
|
Loading…
Reference in New Issue