Refactored blob granule reads to actually be in NativeAPI

This commit is contained in:
Josh Slocum 2021-08-25 15:01:38 -05:00
parent ad899565e9
commit 714aca4f3c
9 changed files with 378 additions and 254 deletions

View File

@ -0,0 +1,111 @@
/*
* BlobGranuleCommon.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BLOBGRANULECOMMON_H
#define FDBCLIENT_BLOBGRANULECOMMON_H
#pragma once
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
// TODO better place for this? It's used in change feeds too
struct MutationsAndVersionRef {
VectorRef<MutationRef> mutations;
Version version;
MutationsAndVersionRef() {}
explicit MutationsAndVersionRef(Version version) : version(version) {}
MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version)
: mutations(mutations), version(version) {}
MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version)
: mutations(to, mutations), version(version) {}
MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from)
: mutations(to, from.mutations), version(from.version) {}
int expectedSize() const { return mutations.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutations, version);
}
};
// TODO should GranuleSnapshot and GranuleDeltas just be typedefs instead of subclasses?
// file format of actual blob files
struct GranuleSnapshot : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 1300395;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<KeyValueRef>&)*this));
}
};
struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
constexpr static FileIdentifier file_identifier = 8563013;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<MutationsAndVersionRef>&)*this));
}
};
// TODO better name?
struct BlobFilenameRef {
constexpr static FileIdentifier file_identifier = 5253554;
StringRef filename;
int64_t offset;
int64_t length;
BlobFilenameRef() {}
BlobFilenameRef(Arena& to, std::string filename, int64_t offset, int64_t length)
: filename(to, filename), offset(offset), length(length) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length);
}
std::string toString() const {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length;
return std::move(ss).str();
}
};
// the assumption of this response is that the client will deserialize the files and apply the mutations themselves
// TODO could filter out delta files that don't intersect the key range being requested?
// TODO since client request passes version, we don't need to include the version of each mutation in the response if we
// pruned it there
struct BlobGranuleChunkRef {
constexpr static FileIdentifier file_identifier = 991434;
KeyRangeRef keyRange;
Version includedVersion;
Optional<BlobFilenameRef> snapshotFile; // not set if it's an incremental read
VectorRef<BlobFilenameRef> deltaFiles;
GranuleDeltas newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, includedVersion, snapshotFile, deltaFiles, newDeltas);
}
};
#endif

View File

@ -22,8 +22,7 @@
#define FDBCLIENT_BLOBWORKERINTERFACE_H
#pragma once
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/StorageServerInterface.h" // just for MutationsAndVersion, TODO pull that out maybe?
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
@ -45,6 +44,7 @@ struct BlobWorkerInterface {
NetworkAddress address() const { return blobGranuleFileRequest.getEndpoint().getPrimaryAddress(); }
bool operator==(const BlobWorkerInterface& r) const { return id() == r.id(); }
bool operator!=(const BlobWorkerInterface& r) const { return !(*this == r); }
std::string toString() const { return id().shortString(); }
template <class Archive>
void serialize(Archive& ar) {
@ -52,68 +52,6 @@ struct BlobWorkerInterface {
}
};
// TODO should name all things that don't have their own arena *Ref
// file format of actual blob files
struct GranuleSnapshot : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 1300395;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<KeyValueRef>&)*this));
}
};
struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
constexpr static FileIdentifier file_identifier = 8563013;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<MutationsAndVersionRef>&)*this));
}
};
// TODO better name?
struct BlobFilenameRef {
constexpr static FileIdentifier file_identifier = 5253554;
StringRef filename;
int64_t offset;
int64_t length;
BlobFilenameRef() {}
BlobFilenameRef(Arena& to, std::string filename, int64_t offset, int64_t length)
: filename(to, filename), offset(offset), length(length) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length);
}
std::string toString() {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length;
return std::move(ss).str();
}
};
// the assumption of this response is that the client will deserialize the files and apply the mutations themselves
// TODO could filter out delta files that don't intersect the key range being requested?
// TODO since client request passes version, we don't need to include the version of each mutation in the response if we
// pruned it there
struct BlobGranuleChunkRef {
constexpr static FileIdentifier file_identifier = 991434;
KeyRangeRef keyRange;
Version includedVersion;
Optional<BlobFilenameRef> snapshotFile; // not set if it's an incremental read
VectorRef<BlobFilenameRef> deltaFiles;
GranuleDeltas newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, includedVersion, snapshotFile, deltaFiles, newDeltas);
}
};
struct BlobGranuleFileReply {
// TODO is there a "proper" way to generate file_identifier?
constexpr static FileIdentifier file_identifier = 6858612;

View File

@ -20,6 +20,7 @@ set(FDBCLIENT_SRCS
BlobWorkerInterface.h
BlobGranuleReader.actor.cpp
BlobGranuleReader.actor.h
BlobGranuleCommon.h
ClientKnobCollection.cpp
ClientKnobCollection.h
ClientKnobs.cpp

View File

@ -267,6 +267,12 @@ public:
Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingRangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popRangeFeedMutations(StringRef rangeID, Version version);
Future<Void> getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range);
Future<Void> readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results,
KeyRange range,
Version begin = 0,
Version end = std::numeric_limits<Version>::max());
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
@ -337,6 +343,7 @@ public:
CoalescedKeyRangeMap<Reference<LocationInfo>> locationCache;
std::map<UID, StorageServerInfo*> server_interf;
std::map<UID, BlobWorkerInterface> blobWorker_interf; // blob workers don't change endpoints for the same ID
// map from ssid -> tss interface
std::unordered_map<UID, StorageServerInterface> tssMapping;

View File

@ -6770,6 +6770,235 @@ ACTOR Future<Void> popRangeFeedMutationsActor(Reference<DatabaseContext> db, Str
Future<Void> DatabaseContext::popRangeFeedMutations(StringRef rangeID, Version version) {
return popRangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
}
// FIXME: code for discovering blob granules is similar enough that it could be refactored? It's pretty simple though
ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db,
PromiseStream<KeyRange> results,
KeyRange keyRange) {
state Database cx(db);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state KeyRange currentRange = keyRange;
printf(
"Getting Blob Granules for [%s - %s)\n", keyRange.begin.printable().c_str(), keyRange.end.printable().c_str());
loop {
try {
state RangeResult blobGranuleMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
for (int i = 0; i < blobGranuleMapping.size() - 1; i++) {
if (blobGranuleMapping[i].value.size()) {
results.send(KeyRangeRef(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key));
}
}
if (blobGranuleMapping.more) {
currentRange = KeyRangeRef(blobGranuleMapping.back().key, currentRange.end);
} else {
results.sendError(end_of_stream());
return Void();
}
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<Void> DatabaseContext::getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range) {
return getBlobGranuleRangesStreamActor(Reference<DatabaseContext>::addRef(this), results, range);
}
// hack (for now) to get blob worker interface into load balance
struct BWLocationInfo : MultiInterface<ReferencedInterface<BlobWorkerInterface>> {
using Locations = MultiInterface<ReferencedInterface<BlobWorkerInterface>>;
explicit BWLocationInfo(const std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>>& v) : Locations(v) {}
};
ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db,
PromiseStream<Standalone<BlobGranuleChunkRef>> results,
KeyRange keyRange,
Version begin,
Optional<Version> end) { // end not present is just latest
state Database cx(db);
try {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state RangeResult blobGranuleMapping;
state Version endVersion;
state Key granuleStartKey;
state Key granuleEndKey;
state int i;
state UID workerId;
// Read mapping and worker interfaces from DB
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02")));
// state KeyRange keyRange = KeyRange(KeyRangeRef());
if (end.present()) {
endVersion = end.get();
} else {
Version _end = wait(tr->getReadVersion());
endVersion = _end;
}
// Right now just read whole blob range assignments from DB
// FIXME: eventually we probably want to cache this and invalidate similarly to storage servers.
// Cache misses could still read from the DB, or we could add it to the Transaction State Store and have
// proxies serve it from memory.
RangeResult _bgMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
blobGranuleMapping = _bgMapping;
if (blobGranuleMapping.more) {
// TODO REMOVE
printf("BG Mapping for [%s - %s) too large!\n");
throw unsupported_operation();
}
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
if (blobGranuleMapping.size() == 0) {
printf("no blob worker assignments yet \n");
throw transaction_too_old();
}
printf("Doing blob granule request @ %lld\n", endVersion);
printf("blob worker assignments:\n");
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
if (!blobGranuleMapping[i].value.size()) {
printf("Key range [%s - %s) missing worker assignment!\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());
// TODO probably new exception type instead
throw transaction_too_old();
}
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
printf(" [%s - %s): %s\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
if (!cx->blobWorker_interf.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
ASSERT(workerInterface.present());
cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get());
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
}
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// Make request for each granule
for (i = 0; i < blobGranuleMapping.size(); i++) {
granuleStartKey = blobGranuleMapping[i].key;
granuleEndKey = blobGranuleMapping[i + 1].key;
workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
// prune first/last granules to requested range
if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}
state BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.beginVersion = begin;
req.readVersion = endVersion;
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
v.push_back(makeReference<ReferencedInterface<BlobWorkerInterface>>(cx->blobWorker_interf[workerId]));
state Reference<MultiInterface<ReferencedInterface<BlobWorkerInterface>>> location =
makeReference<BWLocationInfo>(v);
// use load balance with one option for now for retry and error handling
BlobGranuleFileReply rep = wait(loadBalance(location,
&BlobWorkerInterface::blobGranuleFileRequest,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::False,
nullptr));
/*ErrorOr<BlobGranuleFileReply> _rep =
wait(cx->blobWorker_interf[workerId].blobGranuleFileRequest.tryGetReply(req));
if (_rep.isError()) {
throw _rep.getError();
}
BlobGranuleFileReply rep = _rep.get();*/
printf("Blob granule request for [%s - %s) @ %lld - %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
begin,
endVersion,
workerId.toString().c_str());
for (auto& chunk : rep.chunks) {
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
printf(" IncludedVersion: %lld\n", chunk.includedVersion);
printf("\n\n");
Arena a;
a.dependsOn(rep.arena);
results.send(Standalone<BlobGranuleChunkRef>(chunk, a));
}
/*state PromiseStream<RangeResult> results;
state Future<Void> granuleReader = readBlobGranules(req, rep, bstore, results);
try {
loop {
// printf("Waiting for result chunk\n");
RangeResult result = waitNext(results.getFuture());
printf("Result chunk (%d):\n", result.size());
int resultIdx = 0;
for (auto& it : result) {
printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str());
resultIdx++;
if (resultIdx >= 10) {
break;
}
}
if (resultIdx >= 10) {
printf(" ...\n");
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
printf("granule reader got unexpected error %s\n", e.name());
} else {
// printf("granule reader got end of stream\n");
printf("\n");
}
}*/
}
results.sendError(end_of_stream());
} catch (Error& e) {
printf("blob granule file request got error %s\n", e.name());
}
return Void();
}
Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results,
KeyRange range,
Version begin,
Version end) {
return readBlobGranulesStreamActor(Reference<DatabaseContext>::addRef(this), results, range, begin, end);
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
state ReadYourWritesTransaction tr(cx);
loop {

View File

@ -31,6 +31,7 @@
#include "flow/flow.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/FDBOptions.g.h"

View File

@ -18,7 +18,9 @@
* limitations under the License.
*/
// TODO this should really be renamed "TSSComparison.cpp"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "flow/crc32c.h" // for crc32c_append, to checksum values in tss trace events
// Includes template specializations for all tss operations on storage server types.
@ -337,6 +339,26 @@ void TSS_traceMismatch(TraceEvent& event,
ASSERT(false);
}
template <>
bool TSS_doCompare(const BlobGranuleFileReply& src, const BlobGranuleFileReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const BlobGranuleFileRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const BlobGranuleFileRequest& req,
const BlobGranuleFileReply& src,
const BlobGranuleFileReply& tss) {
ASSERT(false);
}
// only record metrics for data reads
template <>
@ -381,6 +403,10 @@ void TSSMetrics::recordLatency(const RangeFeedRequest& req, double ssLatency, do
template <>
void TSSMetrics::recordLatency(const OverlappingRangeFeedsRequest& req, double ssLatency, double tssLatency) {}
// this isn't even to storage servers
template <>
void TSSMetrics::recordLatency(const BlobGranuleFileRequest& req, double ssLatency, double tssLatency) {}
// -------------------
TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {

View File

@ -31,6 +31,7 @@
#include "fdbrpc/TimedRequest.h"
#include "fdbrpc/TSSComparison.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/CommitTransaction.h"
#include "flow/UnitTest.h"
@ -631,26 +632,6 @@ struct SplitRangeRequest {
}
};
struct MutationsAndVersionRef {
VectorRef<MutationRef> mutations;
Version version;
MutationsAndVersionRef() {}
explicit MutationsAndVersionRef(Version version) : version(version) {}
MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version)
: mutations(mutations), version(version) {}
MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version)
: mutations(to, mutations), version(version) {}
MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from)
: mutations(to, from.mutations), version(from.version) {}
int expectedSize() const { return mutations.expectedSize(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutations, version);
}
};
struct RangeFeedReply {
constexpr static FileIdentifier file_identifier = 11815134;
VectorRef<MutationsAndVersionRef> mutations;

View File

@ -42,8 +42,6 @@
#include "fdbserver/Status.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE
#include "fdbclient/BlobGranuleReader.actor.h" // TODO REMOVE
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbclient/ReadYourWrites.h"
@ -3092,172 +3090,6 @@ public:
}
};
// TODO REMOVE!!!!
ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, RatekeeperInterface interf) {
state Reference<BackupContainerFileSystem> bstore;
state bool doTimeTravel = false;
// TODO CHANGE BACK
// wait(delay(10.0));
wait(delay(70.0));
printf("Initializing CC s3 stuff\n");
try {
if (g_network->isSimulated()) {
printf("CC constructing simulated backup container\n");
bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
} else {
printf("CC constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
printf("CC constructed backup container\n");
}
} catch (Error& e) {
printf("CC got backup container init error %s\n", e.name());
return Void();
}
state std::unordered_map<UID, BlobWorkerInterface> workerInterfaceCache;
state int i;
loop {
try {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02")));
// state KeyRange keyRange = KeyRange(KeyRangeRef());
state Version v = wait(tr->getReadVersion());
if (doTimeTravel == 1) {
printf("Doing time travel read\n");
v -= 60000000;
} else if (doTimeTravel) {
printf("Doing live read\n");
}
doTimeTravel = !doTimeTravel;
/*if (deterministicRandom()->random01() < 0.3) {
v -= 5000000;
} else if (deterministicRandom()->random01() < 0.3) {
v -= 30000000;
}*/
// right now just read whole blob range
state RangeResult blobGranuleMapping = wait(krmGetRanges(
tr, blobGranuleMappingKeys.begin, normalKeys /*keyRange*/, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
if (blobGranuleMapping.size() == 0) {
printf("no blob worker assignments yet \n");
throw transaction_too_old();
}
printf("Doing blob granule request @ %lld\n", v);
// printf("blob worker assignments:\n");
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
state Key granuleStartKey = blobGranuleMapping[i].key;
state Key granuleEndKey = blobGranuleMapping[i + 1].key;
if (!blobGranuleMapping[i].value.size()) {
/*printf("Key range [%s - %s) missing worker assignment!\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());*/
// TODO probably new exception type instead
// TODO ADD BACK
// throw transaction_too_old();
continue;
}
state UID workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
printf(" [%s - %s): %s\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
// TODO CHANGE BACK!!
// prune first/last granules to requested range
/*if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}*/
if (!workerInterfaceCache.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
ASSERT(workerInterface.present());
workerInterfaceCache[workerId] = decodeBlobWorkerListValue(workerInterface.get());
// printf(" decoded worker interface for %s\n", workerId.toString().c_str());
}
state BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.readVersion = v;
ErrorOr<BlobGranuleFileReply> _rep =
wait(workerInterfaceCache[workerId].blobGranuleFileRequest.tryGetReply(req));
if (_rep.isError()) {
throw _rep.getError();
}
BlobGranuleFileReply rep = _rep.get();
printf("Blob granule request for [%s - %s) @ %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
v,
workerId.toString().c_str());
for (auto& chunk : rep.chunks) {
printf("[%s - %s)\n",
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
printf(" IncludedVersion: %lld\n", chunk.includedVersion);
printf("\n\n");
}
state PromiseStream<RangeResult> results;
state Future<Void> granuleReader = readBlobGranules(req, rep, bstore, results);
try {
loop {
// printf("Waiting for result chunk\n");
RangeResult result = waitNext(results.getFuture());
printf("Result chunk (%d):\n", result.size());
int resultIdx = 0;
for (auto& it : result) {
printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str());
resultIdx++;
if (resultIdx >= 10) {
break;
}
}
if (resultIdx >= 10) {
printf(" ...\n");
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
printf("granule reader got unexpected error %s\n", e.name());
} else {
// printf("granule reader got end of stream\n");
printf("\n");
}
}
}
} catch (Error& e) {
printf("blob granule file request got error %s\n", e.name());
}
wait(delay(5.0 + 5.0 * deterministicRandom()->random01()));
}
}
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterControllerData::DBInfo* db) {
state MasterInterface iMaster;
@ -4806,8 +4638,6 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData* self) {
}
if (!ratekeeper.present() || ratekeeper.get().id() != interf.get().id()) {
self->db.setRatekeeper(interf.get());
// TODO REMOVE
self->ac.add(doBlobGranuleRequests(self, interf.get()));
}
checkOutstandingRequests(self);
return Void();