fix merge conflict
This commit is contained in:
parent
6ff7ad3c6a
commit
90a652ccfb
|
@ -7831,276 +7831,6 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio
|
|||
return popChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
|
||||
}
|
||||
|
||||
#define BG_REQUEST_DEBUG false
|
||||
|
||||
ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db,
|
||||
PromiseStream<KeyRange> results,
|
||||
KeyRange keyRange) {
|
||||
// FIXME: use streaming range read
|
||||
state Database cx(db);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state KeyRange currentRange = keyRange;
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("Getting Blob Granules for [%s - %s)\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str());
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state RangeResult blobGranuleMapping = wait(krmGetRanges(
|
||||
tr, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
|
||||
for (int i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
if (blobGranuleMapping[i].value.size()) {
|
||||
results.send(KeyRangeRef(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key));
|
||||
}
|
||||
}
|
||||
if (blobGranuleMapping.more) {
|
||||
currentRange = KeyRangeRef(blobGranuleMapping.back().key, currentRange.end);
|
||||
} else {
|
||||
results.sendError(end_of_stream());
|
||||
return Void();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range) {
|
||||
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
return getBlobGranuleRangesStreamActor(Reference<DatabaseContext>::addRef(this), results, range);
|
||||
}
|
||||
|
||||
// hack (for now) to get blob worker interface into load balance
|
||||
struct BWLocationInfo : MultiInterface<ReferencedInterface<BlobWorkerInterface>> {
|
||||
using Locations = MultiInterface<ReferencedInterface<BlobWorkerInterface>>;
|
||||
explicit BWLocationInfo(const std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>>& v) : Locations(v) {}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db,
|
||||
PromiseStream<Standalone<BlobGranuleChunkRef>> results,
|
||||
KeyRange range,
|
||||
Version begin,
|
||||
Optional<Version> end) { // end not present is just latest
|
||||
state Database cx(db);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state RangeResult blobGranuleMapping;
|
||||
state Version endVersion;
|
||||
state Key granuleStartKey;
|
||||
state Key granuleEndKey;
|
||||
state KeyRange keyRange = range;
|
||||
state int i, loopCounter = 0;
|
||||
state UID workerId;
|
||||
loop {
|
||||
try {
|
||||
// FIXME: Use streaming parallelism?
|
||||
// Read mapping and worker interfaces from DB
|
||||
loopCounter++;
|
||||
loop {
|
||||
try {
|
||||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
if (loopCounter == 1) {
|
||||
// if retrying, use new version for mapping but original version for read version
|
||||
if (end.present()) {
|
||||
endVersion = end.get();
|
||||
} else {
|
||||
Version _end = wait(tr->getReadVersion());
|
||||
endVersion = _end;
|
||||
}
|
||||
}
|
||||
|
||||
// Right now just read whole blob range assignments from DB
|
||||
// FIXME: eventually we probably want to cache this and invalidate similarly to storage servers.
|
||||
// Cache misses could still read from the DB, or we could add it to the Transaction State Store and
|
||||
// have proxies serve it from memory.
|
||||
RangeResult _bgMapping = wait(krmGetRanges(
|
||||
tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
blobGranuleMapping = _bgMapping;
|
||||
if (blobGranuleMapping.more) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
// printf("BG Mapping for [%s - %s) too large!\n");
|
||||
}
|
||||
throw unsupported_operation();
|
||||
}
|
||||
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
if (blobGranuleMapping.size() == 0) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("no blob worker assignments yet \n");
|
||||
}
|
||||
throw transaction_too_old();
|
||||
}
|
||||
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
fmt::print("Doing blob granule request @ {}\n", endVersion);
|
||||
fmt::print("blob worker assignments:\n");
|
||||
}
|
||||
|
||||
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
granuleStartKey = blobGranuleMapping[i].key;
|
||||
granuleEndKey = blobGranuleMapping[i + 1].key;
|
||||
if (!blobGranuleMapping[i].value.size()) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("Key range [%s - %s) missing worker assignment!\n",
|
||||
granuleStartKey.printable().c_str(),
|
||||
granuleEndKey.printable().c_str());
|
||||
// TODO probably new exception type instead
|
||||
}
|
||||
throw transaction_too_old();
|
||||
}
|
||||
|
||||
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf(" [%s - %s): %s\n",
|
||||
granuleStartKey.printable().c_str(),
|
||||
granuleEndKey.printable().c_str(),
|
||||
workerId.toString().c_str());
|
||||
}
|
||||
|
||||
if (!cx->blobWorker_interf.count(workerId)) {
|
||||
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
|
||||
// from the time the mapping was read from the db, the associated blob worker
|
||||
// could have died and so its interface wouldn't be present as part of the blobWorkerList
|
||||
// we persist in the db. So throw wrong_shard_server to get the new mapping
|
||||
if (!workerInterface.present()) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get());
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Make request for each granule
|
||||
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
|
||||
granuleStartKey = blobGranuleMapping[i].key;
|
||||
granuleEndKey = blobGranuleMapping[i + 1].key;
|
||||
// if this was a time travel and the request returned larger bounds, skip this chunk
|
||||
if (granuleEndKey <= keyRange.begin) {
|
||||
continue;
|
||||
}
|
||||
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
|
||||
// prune first/last granules to requested range
|
||||
if (keyRange.begin > granuleStartKey) {
|
||||
granuleStartKey = keyRange.begin;
|
||||
}
|
||||
if (keyRange.end < granuleEndKey) {
|
||||
granuleEndKey = keyRange.end;
|
||||
}
|
||||
|
||||
state BlobGranuleFileRequest req;
|
||||
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
|
||||
req.beginVersion = begin;
|
||||
req.readVersion = endVersion;
|
||||
|
||||
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
|
||||
v.push_back(makeReference<ReferencedInterface<BlobWorkerInterface>>(cx->blobWorker_interf[workerId]));
|
||||
state Reference<MultiInterface<ReferencedInterface<BlobWorkerInterface>>> location =
|
||||
makeReference<BWLocationInfo>(v);
|
||||
// use load balance with one option for now for retry and error handling
|
||||
BlobGranuleFileReply rep = wait(loadBalance(location,
|
||||
&BlobWorkerInterface::blobGranuleFileRequest,
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::False,
|
||||
nullptr));
|
||||
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
fmt::print("Blob granule request for [{0} - {1}) @ {2} - {3} got reply from {4}:\n",
|
||||
granuleStartKey.printable(),
|
||||
granuleEndKey.printable(),
|
||||
begin,
|
||||
endVersion,
|
||||
workerId.toString());
|
||||
}
|
||||
for (auto& chunk : rep.chunks) {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("[%s - %s)\n",
|
||||
chunk.keyRange.begin.printable().c_str(),
|
||||
chunk.keyRange.end.printable().c_str());
|
||||
|
||||
printf(" SnapshotFile:\n %s\n",
|
||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
|
||||
printf(" DeltaFiles:\n");
|
||||
for (auto& df : chunk.deltaFiles) {
|
||||
printf(" %s\n", df.toString().c_str());
|
||||
}
|
||||
printf(" Deltas: (%d)", chunk.newDeltas.size());
|
||||
if (chunk.newDeltas.size() > 0) {
|
||||
fmt::print(" with version [{0} - {1}]",
|
||||
chunk.newDeltas[0].version,
|
||||
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
|
||||
}
|
||||
fmt::print(" IncludedVersion: {}\n", chunk.includedVersion);
|
||||
printf("\n\n");
|
||||
}
|
||||
Arena a;
|
||||
a.dependsOn(rep.arena);
|
||||
results.send(Standalone<BlobGranuleChunkRef>(chunk, a));
|
||||
keyRange = KeyRangeRef(chunk.keyRange.end, keyRange.end);
|
||||
}
|
||||
}
|
||||
results.sendError(end_of_stream());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
||||
e.code() == error_code_connection_failed) {
|
||||
// TODO would invalidate mapping cache here if we had it
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
|
||||
} else {
|
||||
if (BG_REQUEST_DEBUG) {
|
||||
printf("blob granule file request got unexpected error %s\n", e.name());
|
||||
}
|
||||
results.sendError(e);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results,
|
||||
KeyRange range,
|
||||
Version begin,
|
||||
Optional<Version> end) {
|
||||
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
return readBlobGranulesStreamActor(Reference<DatabaseContext>::addRef(this), results, range, begin, end);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
if (lockAware) {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
}
|
||||
|
||||
tr.set(perpetualStorageWiggleKey, enable ? "1"_sr : "0"_sr);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
|
||||
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue