Reader can read blob worker interface and mapping from db, read files, apply mutations, and return result

This commit is contained in:
Josh Slocum 2021-07-22 12:14:30 -05:00
parent d8bdbddb1f
commit e5b0cf20aa
12 changed files with 1090 additions and 283 deletions

View File

@ -0,0 +1,437 @@
/*
* BlobGranuleReader.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/SystemData.h" // for allKeys unit test - could remove
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// TODO more efficient data structure besides std::map? PTree is unecessary since this isn't versioned, but some other
// sorted thing could work. And if it used arenas it'd probably be more efficient with allocations, since everything
// else is in 1 arena and discarded at the end.
// TODO could refactor the like 6 lines of file reading code from here and the delta file function into another actor,
// then this part would also be testable? but meh
ACTOR Future<Void> readSnapshotFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
Arena arena,
KeyRangeRef keyRange,
std::map<KeyRef, ValueRef>* dataMap) {
state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename);
state int64_t size = wait(reader.size());
state uint8_t* data = new (arena) uint8_t[size];
printf("Reading %d bytes from snapshot file %s\n", size, filename.c_str());
int readSize = wait(reader.read(data, size, 0));
printf("Read %d bytes from snapshot file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
StringRef dataRef(data, size);
GranuleSnapshot snapshot = ObjectReader::fromStringRef<GranuleSnapshot>(dataRef, Unversioned());
printf("Parsed %d rows from snapshot file %s\n", snapshot.size(), filename.c_str());
int i = 0;
while (i < snapshot.size() && snapshot[i].key < keyRange.begin) {
i++;
}
while (i < snapshot.size() && snapshot[i].key < keyRange.end) {
dataMap->insert({ snapshot[i].key, snapshot[i].value });
}
printf("Started with %d rows from snapshot file %s\n", dataMap->size(), filename.c_str());
return Void();
}
ACTOR Future<GranuleDeltas> readDeltaFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
Arena arena,
KeyRangeRef keyRange,
Version readVersion) {
state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename);
state int64_t size = wait(reader.size());
state uint8_t* data = new (arena) uint8_t[size];
printf("Reading %d bytes from delta file %s\n", size, filename.c_str());
int readSize = wait(reader.read(data, size, 0));
printf("Read %d bytes from delta file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
// Don't do range or version filtering in here since we'd have to copy/rewrite the deltas and it might starve
// snapshot read task, do it in main thread
StringRef dataRef(data, size);
GranuleDeltas deltas = ObjectReader::fromStringRef<GranuleDeltas>(dataRef, Unversioned());
printf("Parsed %d deltas from delta file %s\n", deltas.size(), filename.c_str());
return deltas;
}
// TODO unit test this!
// TODO this giant switch is mostly lifted from storage server.
// Could refactor atomics to have a generic "handle this atomic mutation" thing instead of having to duplicate code with
// the switch statement everywhere?
static void applyDelta(std::map<KeyRef, ValueRef>* dataMap, Arena& ar, KeyRangeRef keyRange, MutationRef m) {
if (m.type == MutationRef::ClearRange) {
if (m.param2 <= keyRange.begin || m.param1 >= keyRange.end) {
return;
}
// keyRange is inclusive on start, lower_bound is inclusive with the argument, and erase is inclusive for the
// begin. So if lower bound didn't find the exact key, we need to go up one so it doesn't erase an extra key
// outside the range.
std::map<KeyRef, ValueRef>::iterator itStart = dataMap->lower_bound(m.param1);
if (itStart != dataMap->end() && itStart->first < m.param1) {
itStart++;
}
// keyRange is exclusive on end, lower bound is inclusive with the argument, and erase is exclusive for the end
// key. So if lower bound didn't find the exact key, we need to go up one so it doesn't skip the last key it
// should erase
std::map<KeyRef, ValueRef>::iterator itEnd = dataMap->lower_bound(m.param2);
if (itEnd != dataMap->end() && itEnd->first < m.param2) {
itEnd++;
}
dataMap->erase(itStart, itEnd);
} else {
if (m.param1 < keyRange.begin || m.param1 >= keyRange.end) {
return;
}
std::map<KeyRef, ValueRef>::iterator it = dataMap->find(m.param1);
if (m.type != MutationRef::SetValue) {
Optional<StringRef> oldVal;
if (it != dataMap->end()) {
oldVal = it->second;
}
switch (m.type) {
case MutationRef::AddValue:
m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
break;
case MutationRef::And:
m.param2 = doAnd(oldVal, m.param2, ar);
break;
case MutationRef::Or:
m.param2 = doOr(oldVal, m.param2, ar);
break;
case MutationRef::Xor:
m.param2 = doXor(oldVal, m.param2, ar);
break;
case MutationRef::AppendIfFits:
m.param2 = doAppendIfFits(oldVal, m.param2, ar);
break;
case MutationRef::Max:
m.param2 = doMax(oldVal, m.param2, ar);
break;
case MutationRef::Min:
m.param2 = doMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMin:
m.param2 = doByteMin(oldVal, m.param2, ar);
break;
case MutationRef::ByteMax:
m.param2 = doByteMax(oldVal, m.param2, ar);
break;
case MutationRef::MinV2:
m.param2 = doMinV2(oldVal, m.param2, ar);
break;
case MutationRef::AndV2:
m.param2 = doAndV2(oldVal, m.param2, ar);
break;
case MutationRef::CompareAndClear:
if (oldVal.present() && m.param2 == oldVal.get()) {
m.type = MutationRef::ClearRange;
m.param2 = keyAfter(m.param1, ar);
applyDelta(dataMap, ar, keyRange, m);
};
return;
}
}
if (it == dataMap->end()) {
dataMap->insert({ m.param1, m.param2 });
} else {
it->second = m.param2;
}
}
}
// TODO might want to change this to an actor so it can yield periodically?
static void applyDeltas(std::map<KeyRef, ValueRef>* dataMap,
Arena& arena,
GranuleDeltas deltas,
KeyRangeRef keyRange,
Version readVersion) {
for (MutationAndVersion& delta : deltas) {
if (delta.v > readVersion) {
break;
}
applyDelta(dataMap, arena, keyRange, delta.m);
}
}
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket) {
// arena to hold all stuff for parsing and updaing mutations. Most of it will likely be tossed, so we copy at the
// end instead of doing a dependsOn
state Arena arena;
state std::map<KeyRef, ValueRef> dataMap;
Future<Void> readSnapshotFuture =
readSnapshotFile(bstore, bucket, chunk.snapshotFileName.toString(), arena, keyRange, &dataMap);
state std::vector<Future<GranuleDeltas>> readDeltaFutures;
readDeltaFutures.resize(chunk.deltaFileNames.size());
for (StringRef deltaFileName : chunk.deltaFileNames) {
readDeltaFutures.push_back(
readDeltaFile(bstore, bucket, deltaFileName.toString(), arena, keyRange, readVersion));
}
wait(readSnapshotFuture);
for (Future<GranuleDeltas> deltaFuture : readDeltaFutures) {
GranuleDeltas result = wait(deltaFuture);
applyDeltas(&dataMap, arena, result, keyRange, readVersion);
wait(yield());
}
printf("Applying %d memory deltas\n", chunk.newDeltas.size());
applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion);
wait(yield());
RangeResult ret;
for (auto& it : dataMap) {
ret.push_back_deep(ret.arena(), KeyValueRef(it.first, it.second));
// TODO for large reads, probably wait to yield periodically here for slowTask
}
printf("Final processing ended up with %d rows\n", ret.size());
return ret;
}
// TODO probably should add things like limit/bytelimit at some point?
ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
BlobGranuleFileReply reply,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
PromiseStream<RangeResult> results) {
// TODO for large amount of chunks, this should probably have some sort of buffer limit like ReplyPromiseStream.
// Maybe just use ReplyPromiseStream instead of PromiseStream?
try {
state int i;
for (i = 0; i < reply.chunks.size(); i++) {
printf("ReadBlobGranules processing chunk %d [%s - %s)\n",
i,
reply.chunks[i].keyRange.begin.printable().c_str(),
reply.chunks[i].keyRange.end.printable().c_str());
RangeResult chunkResult =
wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore, bucket));
results.send(std::move(chunkResult));
}
printf("ReadBlobGranules done, sending EOS\n");
results.sendError(end_of_stream());
} catch (Error& e) {
printf("ReadBlobGranules got error %s\n", e.name());
results.sendError(e);
}
return Void();
}
TEST_CASE("/blobgranule/reader/applyDelta") {
printf("Testing blob granule deltas\n");
Arena a;
// do this 2 phase arena creation of string refs instead of LiteralStringRef because there is no char* StringRef
// constructor, and valgrind might complain if the stringref data isn't in the arena
std::string sk_a = "A";
std::string sk_ab = "AB";
std::string sk_b = "B";
std::string sk_c = "C";
std::string sk_z = "Z";
std::string sval1 = "1";
std::string sval2 = "2";
StringRef k_a = StringRef(a, sk_a);
StringRef k_ab = StringRef(a, sk_ab);
StringRef k_b = StringRef(a, sk_b);
StringRef k_c = StringRef(a, sk_c);
StringRef k_z = StringRef(a, sk_z);
StringRef val1 = StringRef(a, sval1);
StringRef val2 = StringRef(a, sval2);
std::map<KeyRef, ValueRef> data;
data.insert({ k_a, val1 });
data.insert({ k_ab, val1 });
data.insert({ k_b, val1 });
std::map<KeyRef, ValueRef> correctData = data;
std::map<KeyRef, ValueRef> originalData = data;
ASSERT(data == correctData);
// test all clear permutations
MutationRef mClearEverything(MutationRef::ClearRange, allKeys.begin, allKeys.end);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearEverything);
correctData.clear();
ASSERT(data == correctData);
MutationRef mClearEverything2(MutationRef::ClearRange, allKeys.begin, k_c);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearEverything2);
correctData.clear();
ASSERT(data == correctData);
MutationRef mClearEverything3(MutationRef::ClearRange, k_a, allKeys.end);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearEverything3);
correctData.clear();
ASSERT(data == correctData);
MutationRef mClearEverything4(MutationRef::ClearRange, k_a, k_c);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearEverything4);
correctData.clear();
ASSERT(data == correctData);
MutationRef mClearFirst(MutationRef::ClearRange, k_a, k_ab);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearFirst);
correctData.erase(k_a);
ASSERT(data == correctData);
MutationRef mClearSecond(MutationRef::ClearRange, k_ab, k_b);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearSecond);
correctData.erase(k_ab);
ASSERT(data == correctData);
MutationRef mClearThird(MutationRef::ClearRange, k_b, k_c);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearThird);
correctData.erase(k_b);
ASSERT(data == correctData);
MutationRef mClearFirst2(MutationRef::ClearRange, k_a, k_b);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearFirst2);
correctData.erase(k_a);
correctData.erase(k_ab);
ASSERT(data == correctData);
MutationRef mClearLast2(MutationRef::ClearRange, k_ab, k_c);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mClearLast2);
correctData.erase(k_ab);
correctData.erase(k_b);
ASSERT(data == correctData);
// test set data
MutationRef mSetA(MutationRef::SetValue, k_a, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mSetA);
correctData[k_a] = val2;
ASSERT(data == correctData);
MutationRef mSetAB(MutationRef::SetValue, k_ab, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mSetAB);
correctData[k_ab] = val2;
ASSERT(data == correctData);
MutationRef mSetB(MutationRef::SetValue, k_b, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mSetB);
correctData[k_b] = val2;
ASSERT(data == correctData);
MutationRef mSetC(MutationRef::SetValue, k_c, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mSetC);
correctData[k_c] = val2;
ASSERT(data == correctData);
// test pruning deltas that are outside of the key range
MutationRef mSetZ(MutationRef::SetValue, k_z, val2);
data = originalData;
applyDelta(&data, a, KeyRangeRef(k_a, k_c), mSetZ);
ASSERT(data == originalData);
applyDelta(&data, a, KeyRangeRef(k_ab, k_c), mSetA);
ASSERT(data == originalData);
applyDelta(&data, a, KeyRangeRef(k_ab, k_c), mClearFirst);
ASSERT(data == originalData);
applyDelta(&data, a, KeyRangeRef(k_a, k_ab), mClearThird);
ASSERT(data == originalData);
// Could test all other atomic ops, but if set, max, and compare+clear works, and the others all just directly call
// the atomics, there is little to test
MutationRef mCAndC1(MutationRef::CompareAndClear, k_a, val1);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mCAndC1);
correctData.erase(k_a);
ASSERT(data == correctData);
MutationRef mCAndC2(MutationRef::CompareAndClear, k_a, val2);
data = originalData;
applyDelta(&data, a, allKeys, mCAndC2);
ASSERT(data == originalData);
MutationRef mCAndCZ(MutationRef::CompareAndClear, k_z, val2);
data = originalData;
applyDelta(&data, a, allKeys, mCAndCZ);
ASSERT(data == originalData);
MutationRef mMaxA(MutationRef::ByteMax, k_a, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mMaxA);
correctData[k_a] = val2;
ASSERT(data == correctData);
MutationRef mMaxC(MutationRef::ByteMax, k_c, val2);
data = originalData;
correctData = originalData;
applyDelta(&data, a, allKeys, mMaxC);
correctData[k_c] = val2;
ASSERT(data == correctData);
return Void();
}

View File

@ -0,0 +1,51 @@
/*
* BlobGranuleReader.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(BLOB_GRANULE_READER_CLIENT_G_H)
#define BLOB_GRANULE_READER_CLIENT_G_H
#include "fdbclient/BlobGranuleReader.actor.g.h"
#elif !defined(BLOB_GRANULE_READER_CLIENT_H)
#define BLOB_GRANULE_READER_CLIENT_H
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/S3BlobStore.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Reads the fileset in the reply using the provided blob store, amnd filters data and mutations by key + version from
// the request
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket);
ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
BlobGranuleFileReply reply,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
PromiseStream<RangeResult> results);
#include "flow/unactorcompiler.h"
#endif

View File

@ -0,0 +1,134 @@
/*
* BlobWorkerInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BLOBWORKERINTERFACE_H
#define FDBCLIENT_BLOBWORKERINTERFACE_H
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
struct BlobWorkerInterface {
constexpr static FileIdentifier file_identifier = 8358753;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct BlobGranuleFileRequest> blobGranuleFileRequest;
struct LocalityData locality;
UID myId;
BlobWorkerInterface() {}
explicit BlobWorkerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
void initEndpoints() {}
UID id() const { return myId; }
NetworkAddress address() const { return blobGranuleFileRequest.getEndpoint().getPrimaryAddress(); }
bool operator==(const BlobWorkerInterface& r) const { return id() == r.id(); }
bool operator!=(const BlobWorkerInterface& r) const { return !(*this == r); }
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, blobGranuleFileRequest, locality, myId);
}
};
struct MutationAndVersion {
constexpr static FileIdentifier file_identifier = 4268041;
MutationRef m;
Version v;
MutationAndVersion() {}
MutationAndVersion(Arena& to, MutationRef m, Version v) : m(to, m), v(v) {}
MutationAndVersion(Arena& to, const MutationAndVersion& from) : m(to, from.m), v(from.v) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, m, v);
}
};
// file format of actual blob files
struct GranuleSnapshot : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 4268040;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<KeyValueRef>&)*this));
}
};
struct GranuleDeltas : VectorRef<MutationAndVersion> {
constexpr static FileIdentifier file_identifier = 4268042;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<MutationAndVersion>&)*this));
}
};
// the assumption of this response is that the client will deserialize the files and apply the mutations themselves
// TODO could filter out delta files that don't intersect the key range being requested?
// TODO since client request passes version, we don't need to include the version of each mutation in the response if we
// pruned it there
struct BlobGranuleChunk {
constexpr static FileIdentifier file_identifier = 991434;
KeyRangeRef keyRange;
StringRef snapshotFileName;
VectorRef<StringRef> deltaFileNames;
GranuleDeltas newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, snapshotFileName, deltaFileNames, newDeltas);
}
};
struct BlobGranuleFileReply {
// TODO is there a "proper" way to generate file_identifier?
constexpr static FileIdentifier file_identifier = 6858612;
Arena arena;
VectorRef<BlobGranuleChunk> chunks;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, chunks, arena);
}
};
// TODO could do a reply promise stream of file mutations to bound memory requirements?
// Have to load whole snapshot file into memory though so it doesn't actually matter too much
struct BlobGranuleFileRequest {
constexpr static FileIdentifier file_identifier = 4150141;
Arena arena;
KeyRangeRef keyRange;
Version readVersion;
ReplyPromise<BlobGranuleFileReply> reply;
BlobGranuleFileRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, readVersion, reply, arena);
}
};
#endif

View File

@ -17,6 +17,9 @@ set(FDBCLIENT_SRCS
BackupContainerS3BlobStore.h
ClientBooleanParams.cpp
ClientBooleanParams.h
BlobWorkerInterface.h
BlobGranuleReader.actor.cpp
BlobGranuleReader.actor.h
ClientKnobCollection.cpp
ClientKnobCollection.h
ClientKnobs.cpp

View File

@ -1039,10 +1039,53 @@ const KeyRangeRef configClassKeys("\xff\xff/configClasses/"_sr, "\xff\xff/config
const KeyRef blobRangeChangeKey = LiteralStringRef("\xff\x02/blobRangeChange");
const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff/blobRange/"), LiteralStringRef("\xff/blobRange0"));
// TODO need something for tracking "current" granules
// blob range file data
const KeyRangeRef blobGranuleKeys(LiteralStringRef("\xff/blobGranules/"), LiteralStringRef("\xff/blobGranules0"));
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff/blobGranuleFiles/"),
LiteralStringRef("\xff/blobGranuleFiles0"));
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff/blobGranuleMapping/"),
LiteralStringRef("\xff/blobGranuleMapping0"));
const Value blobGranuleMappingValueFor(UID const& workerID) {
BinaryWriter wr(Unversioned());
wr << workerID;
return wr.toValue();
}
UID decodeBlobGranuleMappingValue(ValueRef const& value) {
UID workerID;
BinaryReader reader(value, Unversioned());
reader >> workerID;
return workerID;
}
const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff/blobWorkerList/"),
LiteralStringRef("\xff/blobWorkerList0"));
const Key blobWorkerListKeyFor(UID workerID) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(blobWorkerListKeys.begin);
wr << workerID;
return wr.toValue();
}
const Value blobWorkerListValue(BlobWorkerInterface const& worker) {
return ObjectWriter::toValue(worker, IncludeVersion());
}
UID decodeBlobWorkerListKey(KeyRef const& key) {
UID workerID;
BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), Unversioned());
reader >> workerID;
return workerID;
}
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
BlobWorkerInterface interf;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(interf);
return interf;
}
// for tests
void testSSISerdes(StorageServerInterface const& ssi, bool useFB) {

View File

@ -25,6 +25,7 @@
// Functions and constants documenting the organization of the reserved keyspace in the database beginning with "\xFF"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h
#include "fdbclient/StorageServerInterface.h"
// Don't warn on constants being defined in this file.
@ -507,8 +508,26 @@ extern const KeyRangeRef blobRangeKeys;
// blob granule keys
// \xff/blobGranule/(startKey, endKey, {snapshot,delta}, version) = [[filename]]
extern const KeyRangeRef blobGranuleKeys;
// TODO probably want to add key range of file here so we can prune requests
// TODO this also technically means clients don't have to go through the blob worker for time-travel reads if they're
// older than the last delta file.
// \xff/blobGranuleFiles/(startKey, endKey, {snapshot|delta}, version) = [[filename]]
extern const KeyRangeRef blobGranuleFileKeys;
// TODO could shrink the size of this keyspace by using something similar to tags instead of UIDs
// \xff/blobGranuleMapping/[[begin]] = [[BlobWorkerID]]
extern const KeyRangeRef blobGranuleMappingKeys;
const Value blobGranuleMappingValueFor(UID const& workerID);
UID decodeBlobGranuleMappingValue(ValueRef const& value);
// \xff/blobWorkerList/[[BlobWorkerID]] = [[BlobWorkerInterface]]
extern const KeyRangeRef blobWorkerListKeys;
const Key blobWorkerListKeyFor(UID workerID);
const Value blobWorkerListValue(BlobWorkerInterface const& interface);
UID decodeBlobWorkerListKey(KeyRef const& key);
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
#pragma clang diagnostic pop

View File

@ -42,6 +42,8 @@
#include "fdbserver/Status.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE
#include "fdbclient/BlobGranuleReader.actor.h" // TODO REMOVE
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbclient/ReadYourWrites.h"
@ -3089,43 +3091,134 @@ public:
// TODO REMOVE!!!!
ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, RatekeeperInterface interf) {
state std::string bucket = SERVER_KNOBS->BG_BUCKET;
state Reference<S3BlobStoreEndpoint> bstore;
printf("Initializing CC s3 stuff\n");
try {
printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str());
bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL);
printf("checking if bucket %s exists\n", bucket.c_str());
bool bExists = wait(bstore->bucketExists(bucket));
if (!bExists) {
printf("Bucket %s does not exist!\n", bucket.c_str());
return Void();
}
} catch (Error& e) {
printf("CC got s3 init error %s\n", e.name());
return Void();
}
state std::unordered_map<UID, BlobWorkerInterface> workerInterfaceCache;
state int i;
loop {
try {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02")));
state Version v = wait(tr->getReadVersion());
if (deterministicRandom()->random01() < 0.3) {
v -= 5000000;
} else if (deterministicRandom()->random01() < 0.3) {
v -= 30000000;
}
printf("Doing blob granule request @ %lld\n", v);
BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, allKeys.begin), StringRef(req.arena, allKeys.end));
req.readVersion = v;
BlobGranuleFileReply rep = wait(interf.blobGranuleFileRequest.getReply(req));
printf("Blob granule request got reply:\n");
for (auto& chunk : rep.chunks) {
printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str());
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFileNames) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].v,
chunk.newDeltas[chunk.newDeltas.size() - 1].v);
}
printf("\n\n");
state RangeResult blobGranuleMapping = wait(
krmGetRanges(tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
if (blobGranuleMapping.size() == 0) {
printf("no blob worker assignments yet \n");
throw transaction_too_old();
}
printf("Doing blob granule request @ %lld\n", v);
printf("blob worker assignments:\n");
for (i = 0; i < blobGranuleMapping.size() - 1; i++) {
state Key granuleStartKey = blobGranuleMapping[i].key;
state Key granuleEndKey = blobGranuleMapping[i + 1].key;
if (!blobGranuleMapping[i].value.size()) {
printf("Key range [%s - %s) missing worker assignment!\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str());
// TODO probably new exception type instead
throw transaction_too_old();
}
state UID workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value);
printf(" [%s - %s): %s\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
workerId.toString().c_str());
if (i == 0) {
granuleStartKey = keyRange.begin;
}
if (i == blobGranuleMapping.size() - 2) {
granuleEndKey = keyRange.end;
}
if (!workerInterfaceCache.count(workerId)) {
Optional<Value> workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId)));
ASSERT(workerInterface.present());
workerInterfaceCache[workerId] = decodeBlobWorkerListValue(workerInterface.get());
printf(" decoded worker interface for %s\n", workerId.toString().c_str());
}
state BlobGranuleFileRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.readVersion = v;
ErrorOr<BlobGranuleFileReply> _rep =
wait(workerInterfaceCache[workerId].blobGranuleFileRequest.tryGetReply(req));
BlobGranuleFileReply rep = _rep.get();
printf("Blob granule request for [%s - %s) @ %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),
granuleEndKey.printable().c_str(),
v,
workerId.toString().c_str());
for (auto& chunk : rep.chunks) {
printf("[%s - %s)\n",
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str());
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFileNames) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].v,
chunk.newDeltas[chunk.newDeltas.size() - 1].v);
}
printf("\n\n");
}
state PromiseStream<RangeResult> results;
state Future<Void> granuleReader = readBlobGranules(req, rep, bstore, bucket, results);
try {
loop {
printf("Waiting for result chunk\n");
RangeResult result = waitNext(results.getFuture());
printf("Result chunk (%d):\n", result.size());
for (auto& it : result) {
printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str());
}
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
printf("granule reader got unexpected error %s\n", e.name());
} else {
printf("granule reader got end of stream\n");
}
}
}
} catch (Error& e) {
printf("blob granule file request got error %s\n", e.name());
}
wait(delay(5.0));
wait(delay(5.0 + 5.0 * deterministicRandom()->random01()));
}
}

View File

@ -27,6 +27,7 @@
#include "fdbclient/Tuple.h" // TODO REMOVE
#include "fdbclient/S3BlobStore.h" // TODO REMOVE
#include "fdbclient/AsyncFileS3BlobStore.actor.h" // TODO REMOVE
#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE
#include "fdbclient/TagThrottle.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
@ -543,26 +544,6 @@ struct GrvProxyInfo {
}
};
// TODO MOVE elsewhere
struct GranuleSnapshot : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 4268040;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<KeyValueRef>&)*this));
}
};
struct GranuleDeltas : VectorRef<MutationAndVersion> {
constexpr static FileIdentifier file_identifier = 4268042;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<MutationAndVersion>&)*this));
}
};
// TODO make refcounted so it can go in RatekeeperData and the blob worker
struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
std::deque<std::pair<Version, std::string>> snapshotFiles;
@ -1446,6 +1427,7 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
}
}
// TODO add granule locks
ACTOR Future<std::pair<Version, std::string>> writeDeltaFile(RatekeeperData* self,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
@ -1479,7 +1461,7 @@ ACTOR Future<std::pair<Version, std::string>> writeDeltaFile(RatekeeperData* sel
Tuple deltaFileKey;
deltaFileKey.append(startKey).append(endKey);
deltaFileKey.append(LiteralStringRef("delta")).append(lastVersion);
tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname);
tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
wait(tr->commit());
printf("blob worker updated fdb with delta file %s at version %lld\n", fname.c_str(), lastVersion);
@ -1544,7 +1526,7 @@ ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(RatekeeperData
Tuple snapshotFileKey;
snapshotFileKey.append(startKey).append(endKey);
snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion);
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname);
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
wait(tr->commit());
printf("Blob worker committed new snapshot file for range\n");
return std::pair<Version, std::string>(readVersion, fname);
@ -1561,22 +1543,17 @@ ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(RatekeeperData
}
}
// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure
ACTOR Future<Void> blobWorker(RatekeeperData* self,
Reference<GranuleMetadata> metadata,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
PromiseStream<MutationAndVersion> rangeFeed,
PromiseStream<Version> snapshotVersions, // update fake range feed with actual versions
Key startKey,
Key endKey) {
state Arena deltaArena;
printf("Blob worker starting for [%s - %s) for bucket %s\n",
startKey.printable().c_str(),
endKey.printable().c_str(),
bucket.c_str());
ACTOR Future<Void> blobWorkerUpdateFiles(
RatekeeperData* self,
Reference<GranuleMetadata> metadata,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
PromiseStream<MutationAndVersion> rangeFeed,
PromiseStream<Version> snapshotVersions, // update fake range feed with actual versions
Key startKey,
Key endKey) {
// dump snapshot at the start
state Arena deltaArena;
std::pair<Version, std::string> newSnapshotFile = wait(dumpSnapshotFromFDB(self, bstore, bucket, startKey, endKey));
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
@ -1623,142 +1600,6 @@ ACTOR Future<Void> blobWorker(RatekeeperData* self,
}
}
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
ACTOR Future<Void> fakeRangeFeed(PromiseStream<MutationAndVersion> mutationStream,
PromiseStream<Version> snapshotVersions,
Key startKey,
Key endKey) {
state Version version = waitNext(snapshotVersions.getFuture());
state uint32_t targetKbPerSec = (uint32_t)(SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES) / 10;
state Arena arena;
printf("Fake range feed got initial version %lld\n", version);
loop {
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
state uint32_t bytesGenerated = 0;
state uint32_t targetKbThisSec =
targetKbPerSec / 2 + (uint32_t)(deterministicRandom()->random01() * targetKbPerSec);
state uint32_t mutationsGenerated = 0;
while (bytesGenerated < targetKbThisSec) {
MutationAndVersion update;
update.v = version;
update.m.param1 = startKey;
update.m.param2 = startKey;
if (deterministicRandom()->random01() < 0.5) {
// clear start key
update.m.type = MutationRef::Type::ClearRange;
} else {
// set
update.m.type = MutationRef::Type::SetValue;
}
mutationsGenerated++;
bytesGenerated += 17 + 2 * startKey.size();
mutationStream.send(update);
// simulate multiple mutations with same version (TODO: this should be possible right)
if (deterministicRandom()->random01() < 0.4) {
version++;
}
if (mutationsGenerated % 1000 == 0) {
wait(yield());
}
}
printf("Fake range feed generated %d mutations at version %lld\n", mutationsGenerated, version);
choose {
when(wait(delay(1.0))) {
// slightly slower than real versions, to try to ensure it doesn't get ahead
version += 950000;
}
when(Version _v = waitNext(snapshotVersions.getFuture())) {
if (_v > version) {
printf("updating fake range feed from %lld to snapshot version %lld\n", version, _v);
version = _v;
} else {
printf("snapshot version %lld was ahead of fake range feed version %lld, keeping fake version\n",
_v,
version);
}
}
}
}
}
// TODO MOVE ELSEWHERE
ACTOR Future<Void> blobManagerPoc(RatekeeperData* self) {
state Future<Void> currentWorker;
state Future<Void> currentRangeFeed;
state Reference<S3BlobStoreEndpoint> bstore;
state std::string bucket = SERVER_KNOBS->BG_BUCKET;
printf("Initializing blob manager s3 stuff\n");
try {
printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str());
bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL);
printf("checking if bucket %s exists\n", bucket.c_str());
bool bExists = wait(bstore->bucketExists(bucket));
if (!bExists) {
printf("Bucket %s does not exist!\n", bucket.c_str());
return Void();
}
} catch (Error& e) {
printf("Blob manager got s3 init error %s\n", e.name());
return Void();
}
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
printf("Blob manager checking for range updates\n");
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// TODO probably knobs here? This should always be pretty small though
RangeResult results = wait(krmGetRanges(
tr, blobRangeKeys.begin, KeyRange(allKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
bool foundRange = false;
for (int i = 0; i < results.size() - 1; i++) {
if (results[i].value == LiteralStringRef("1")) {
ASSERT(!foundRange); // TODO for now only assume at most 1 range is set
ASSERT(results[i + 1].value == StringRef());
printf("Blob manager sees range [%s - %s)\n",
results[i].key.printable().c_str(),
results[i + 1].key.printable().c_str());
// TODO better way to make standalone to copy the keys into?
self->granuleRange = KeyRangeRef(results[i].key, results[i + 1].key);
self->granuleMetadata = makeReference<GranuleMetadata>();
foundRange = true;
PromiseStream<MutationAndVersion> mutationStream;
PromiseStream<Version> snapshotVersions;
currentRangeFeed =
fakeRangeFeed(mutationStream, snapshotVersions, results[i].key, results[i + 1].key);
currentWorker = blobWorker(self,
self->granuleMetadata,
bstore,
bucket,
mutationStream,
snapshotVersions,
results[i].key,
results[i + 1].key);
}
}
state Future<Void> watchFuture = tr->watch(blobRangeChangeKey);
wait(tr->commit());
wait(watchFuture);
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
}
static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranuleFileRequest& req) {
if (!self->granuleMetadata.isValid()) {
printf("No blob granule found, skipping request\n");
@ -1766,7 +1607,7 @@ static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranule
return;
}
BlobGranuleFileReply rep;
// TODO could check for intersection, but just assume it intersects for now
// TODO could check for intersection, but just assume it intersects all files+mutations for now
// copy everything into reply's arena
BlobGranuleChunk chunk;
@ -1820,6 +1661,258 @@ static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranule
req.reply.send(rep);
}
// TODO list of key ranges in the future
ACTOR Future<Void> registerBlobWorker(RatekeeperData* self, BlobWorkerInterface interf, KeyRange keyRange) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
try {
Key blobWorkerListKey = blobWorkerListKeyFor(interf.id());
tr->addReadConflictRange(singleKeyRange(blobWorkerListKey));
tr->set(blobWorkerListKey, blobWorkerListValue(interf));
wait(krmSetRangeCoalescing(tr,
blobGranuleMappingKeys.begin,
keyRange,
KeyRange(allKeys),
blobGranuleMappingValueFor(interf.id())));
wait(tr->commit());
printf("Registered blob worker %s for key range [%s - %s)\n",
interf.id().toString().c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
return Void();
} catch (Error& e) {
printf("Registering blob worker %s for key range [%s - %s) got error %s\n",
interf.id().toString().c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
e.name());
wait(tr->onError(e));
}
}
}
// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure?
ACTOR Future<Void> blobWorkerCore(RatekeeperData* self,
Reference<GranuleMetadata> metadata,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
PromiseStream<MutationAndVersion> rangeFeed,
PromiseStream<Version> snapshotVersions,
Key startKey,
Key endKey,
LocalityData locality) {
printf("Blob worker starting for [%s - %s) for bucket %s\n",
startKey.printable().c_str(),
endKey.printable().c_str(),
bucket.c_str());
state BlobWorkerInterface interf(locality, deterministicRandom()->randomUniqueID());
interf.initEndpoints();
wait(registerBlobWorker(self, interf, KeyRange(KeyRangeRef(startKey, endKey))));
state PromiseStream<Future<Void>> addActor;
state Future<Void> collection = actorCollection(addActor.getFuture());
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
// start file updater
addActor.send(blobWorkerUpdateFiles(self, metadata, bstore, bucket, rangeFeed, snapshotVersions, startKey, endKey));
try {
loop choose {
when(BlobGranuleFileRequest req = waitNext(interf.blobGranuleFileRequest.getFuture())) {
printf("Got blob granule request [%s - %s)\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
handleBlobGranuleFileRequest(self, req);
}
when(wait(collection)) {
ASSERT(false);
throw internal_error();
}
}
} catch (Error& e) {
printf("Blob worker got error %s, exiting\n", e.name());
TraceEvent("BlobWorkerDied", interf.id()).error(e, true);
}
return Void();
}
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
ACTOR Future<Void> fakeRangeFeed(PromiseStream<MutationAndVersion> mutationStream,
PromiseStream<Version> snapshotVersions,
Key startKey,
Key endKey) {
state Version version = waitNext(snapshotVersions.getFuture());
state uint32_t targetKbPerSec = (uint32_t)(SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES) / 10;
state Arena arena;
printf("Fake range feed got initial version %lld\n", version);
loop {
// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions
state uint32_t bytesGenerated = 0;
state uint32_t targetKbThisSec =
targetKbPerSec / 2 + (uint32_t)(deterministicRandom()->random01() * targetKbPerSec);
state uint32_t mutationsGenerated = 0;
while (bytesGenerated < targetKbThisSec) {
MutationAndVersion update;
update.v = version;
update.m.param1 = startKey;
update.m.param2 = startKey;
if (deterministicRandom()->random01() < 0.5) {
// clear start key
update.m.type = MutationRef::Type::ClearRange;
} else {
// set
update.m.type = MutationRef::Type::SetValue;
}
mutationsGenerated++;
bytesGenerated += 17 + 2 * startKey.size();
mutationStream.send(update);
// simulate multiple mutations with same version (TODO: this should be possible right)
if (deterministicRandom()->random01() < 0.4) {
version++;
}
if (mutationsGenerated % 1000 == 0) {
wait(yield());
}
}
// printf("Fake range feed generated %d mutations at version %lld\n", mutationsGenerated, version);
choose {
when(wait(delay(1.0))) {
// slightly slower than real versions, to try to ensure it doesn't get ahead
version += 950000;
}
when(Version _v = waitNext(snapshotVersions.getFuture())) {
if (_v > version) {
printf("updating fake range feed from %lld to snapshot version %lld\n", version, _v);
version = _v;
} else {
printf("snapshot version %lld was ahead of fake range feed version %lld, keeping fake version\n",
_v,
version);
}
}
}
}
}
// TODO REMOVE eventually
ACTOR Future<Void> nukeBlobWorkerData(RatekeeperData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
loop {
try {
tr->clear(blobWorkerListKeys);
tr->clear(blobGranuleMappingKeys);
return Void();
} catch (Error& e) {
printf("Nuking blob worker data got error %s\n", e.name());
wait(tr->onError(e));
}
}
}
// TODO MOVE ELSEWHERE
ACTOR Future<Void> blobManagerPoc(RatekeeperData* self, LocalityData locality) {
state Future<Void> currentWorker;
state Future<Void> currentRangeFeed;
state Reference<S3BlobStoreEndpoint> bstore;
state std::string bucket = SERVER_KNOBS->BG_BUCKET;
printf("Initializing blob manager s3 stuff\n");
try {
printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str());
bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL);
printf("checking if bucket %s exists\n", bucket.c_str());
bool bExists = wait(bstore->bucketExists(bucket));
if (!bExists) {
printf("Bucket %s does not exist!\n", bucket.c_str());
return Void();
}
} catch (Error& e) {
printf("Blob manager got s3 init error %s\n", e.name());
return Void();
}
// TODO remove once we have persistence + failure detection
// TODO move to separate actor
printf("Blob manager nuking previous workers and range assignments on startup\n");
wait(nukeBlobWorkerData(self));
printf("Blob manager nuked previous workers and range assignments\n");
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
printf("Blob manager checking for range updates\n");
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// TODO probably knobs here? This should always be pretty small though
RangeResult results = wait(krmGetRanges(
tr, blobRangeKeys.begin, KeyRange(allKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
bool foundRange = false;
for (int i = 0; i < results.size() - 1; i++) {
if (results[i].value == LiteralStringRef("1")) {
ASSERT(!foundRange); // TODO for now only assume at most 1 range is set
ASSERT(results[i + 1].value == StringRef());
printf("Blob manager sees range [%s - %s)\n",
results[i].key.printable().c_str(),
results[i + 1].key.printable().c_str());
// TODO better way to make standalone to copy the keys into?
self->granuleRange = KeyRangeRef(results[i].key, results[i + 1].key);
self->granuleMetadata = makeReference<GranuleMetadata>();
foundRange = true;
PromiseStream<MutationAndVersion> mutationStream;
PromiseStream<Version> snapshotVersions;
currentRangeFeed =
fakeRangeFeed(mutationStream, snapshotVersions, results[i].key, results[i + 1].key);
currentWorker = blobWorkerCore(self,
self->granuleMetadata,
bstore,
bucket,
mutationStream,
snapshotVersions,
results[i].key,
results[i + 1].key,
locality);
}
}
if (!foundRange) {
// cancel current one
printf("Blob manager found zero blob ranges, cancelling current worker and fake range feed!\n");
currentRangeFeed = Never();
currentWorker = Never();
}
state Future<Void> watchFuture = tr->watch(blobRangeChangeKey);
wait(tr->commit());
printf("Blob manager awaiting new range update\n");
wait(watchFuture);
break;
} catch (Error& e) {
printf("Blob manager got error looking for range updates %s\n", e.name());
wait(tr->onError(e));
}
}
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
@ -1849,7 +1942,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
printf("Starting blob manager with url=%s and bucket=%s\n",
SERVER_KNOBS->BG_URL.c_str(),
SERVER_KNOBS->BG_BUCKET.c_str());
self.addActor.send(blobManagerPoc(&self));
self.addActor.send(blobManagerPoc(&self, rkInterf.locality));
}
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
@ -1944,12 +2037,6 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
updateCommitCostEstimation(&self, req.ssTrTagCommitCost);
req.reply.send(Void());
}
when(BlobGranuleFileRequest req = waitNext(rkInterf.blobGranuleFileRequest.getFuture())) {
printf("Got blob granule request [%s - %s)\n",
req.keyRange.begin.printable().c_str(),
req.keyRange.end.printable().c_str());
handleBlobGranuleFileRequest(&self, req);
}
when(wait(err.getFuture())) {}
when(wait(dbInfo->onChange())) {
if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) {

View File

@ -32,8 +32,6 @@ struct RatekeeperInterface {
RequestStream<struct GetRateInfoRequest> getRateInfo;
RequestStream<struct HaltRatekeeperRequest> haltRatekeeper;
RequestStream<struct ReportCommitCostEstimationRequest> reportCommitCostEstimation;
// TODO REMOVE!!!
RequestStream<struct BlobGranuleFileRequest> blobGranuleFileRequest;
struct LocalityData locality;
UID myId;
@ -48,14 +46,7 @@ struct RatekeeperInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar,
waitFailure,
getRateInfo,
haltRatekeeper,
reportCommitCostEstimation,
blobGranuleFileRequest,
locality,
myId);
serializer(ar, waitFailure, getRateInfo, haltRatekeeper, reportCommitCostEstimation, locality, myId);
}
};
@ -149,63 +140,4 @@ struct ReportCommitCostEstimationRequest {
}
};
// TODO MOVE ELSEWHERE
struct MutationAndVersion {
constexpr static FileIdentifier file_identifier = 4268041;
MutationRef m;
Version v;
MutationAndVersion() {}
MutationAndVersion(Arena& to, MutationRef m, Version v) : m(to, m), v(v) {}
MutationAndVersion(Arena& to, const MutationAndVersion& from) : m(to, from.m), v(from.v) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, m, v);
}
};
// the assumption of this response is
struct BlobGranuleChunk {
KeyRangeRef keyRange;
StringRef snapshotFileName;
VectorRef<StringRef> deltaFileNames;
VectorRef<MutationAndVersion> newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, snapshotFileName, deltaFileNames, newDeltas);
}
};
struct BlobGranuleFileReply {
// TODO "proper" way to generate file_identifier?
constexpr static FileIdentifier file_identifier = 6858612;
Arena arena;
VectorRef<BlobGranuleChunk> chunks;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, chunks, arena);
}
};
// TODO could do a reply promise stream of file mutations to bound memory requirements?
// Have to load whole snapshot file into memory though so it doesn't actually matter too much
struct BlobGranuleFileRequest {
constexpr static FileIdentifier file_identifier = 4150141;
Arena arena;
KeyRangeRef keyRange;
Version readVersion;
ReplyPromise<BlobGranuleFileReply> reply;
BlobGranuleFileRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, readVersion, reply, arena);
}
};
#endif // FDBSERVER_RATEKEEPERINTERFACE_H

View File

@ -2110,13 +2110,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
//"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) {
// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
//shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
// shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
throw wrong_shard_server();
}
@ -2193,10 +2193,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
}
/*for( int i = 0; i < r.data.size(); i++ ) {
StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
data->metrics.notify(r.data[i].key, m);
StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an
int data->metrics.notify(r.data[i].key, m);
}*/
// For performance concerns, the cost of a range read is billed to the start key and end key of the

View File

@ -0,0 +1,7 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobgranule/

View File

@ -43,6 +43,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES BackupContainers.txt IGNORE)
add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE)
add_fdb_test(TEST_FILES BigInsert.txt IGNORE)
add_fdb_test(TEST_FILES BlobGranuleReaderUnit.txt) # TODO re-ignore!
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)