diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp new file mode 100644 index 0000000000..30528b6581 --- /dev/null +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -0,0 +1,437 @@ +/* + * BlobGranuleReader.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AsyncFileS3BlobStore.actor.h" +#include "fdbclient/Atomic.h" +#include "fdbclient/BlobGranuleReader.actor.h" +#include "fdbclient/SystemData.h" // for allKeys unit test - could remove +#include "flow/UnitTest.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// TODO more efficient data structure besides std::map? PTree is unecessary since this isn't versioned, but some other +// sorted thing could work. And if it used arenas it'd probably be more efficient with allocations, since everything +// else is in 1 arena and discarded at the end. + +// TODO could refactor the like 6 lines of file reading code from here and the delta file function into another actor, +// then this part would also be testable? but meh +ACTOR Future readSnapshotFile(Reference bstore, + std::string bucket, + std::string filename, + Arena arena, + KeyRangeRef keyRange, + std::map* dataMap) { + state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename); + state int64_t size = wait(reader.size()); + state uint8_t* data = new (arena) uint8_t[size]; + printf("Reading %d bytes from snapshot file %s\n", size, filename.c_str()); + int readSize = wait(reader.read(data, size, 0)); + printf("Read %d bytes from snapshot file %s\n", readSize, filename.c_str()); + ASSERT(size == readSize); + + StringRef dataRef(data, size); + + GranuleSnapshot snapshot = ObjectReader::fromStringRef(dataRef, Unversioned()); + printf("Parsed %d rows from snapshot file %s\n", snapshot.size(), filename.c_str()); + int i = 0; + while (i < snapshot.size() && snapshot[i].key < keyRange.begin) { + i++; + } + while (i < snapshot.size() && snapshot[i].key < keyRange.end) { + dataMap->insert({ snapshot[i].key, snapshot[i].value }); + } + printf("Started with %d rows from snapshot file %s\n", dataMap->size(), filename.c_str()); + + return Void(); +} + +ACTOR Future readDeltaFile(Reference bstore, + std::string bucket, + std::string filename, + Arena arena, + KeyRangeRef keyRange, + Version readVersion) { + state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename); + state int64_t size = wait(reader.size()); + state uint8_t* data = new (arena) uint8_t[size]; + printf("Reading %d bytes from delta file %s\n", size, filename.c_str()); + int readSize = wait(reader.read(data, size, 0)); + printf("Read %d bytes from delta file %s\n", readSize, filename.c_str()); + ASSERT(size == readSize); + + // Don't do range or version filtering in here since we'd have to copy/rewrite the deltas and it might starve + // snapshot read task, do it in main thread + + StringRef dataRef(data, size); + + GranuleDeltas deltas = ObjectReader::fromStringRef(dataRef, Unversioned()); + printf("Parsed %d deltas from delta file %s\n", deltas.size(), filename.c_str()); + return deltas; +} + +// TODO unit test this! + +// TODO this giant switch is mostly lifted from storage server. +// Could refactor atomics to have a generic "handle this atomic mutation" thing instead of having to duplicate code with +// the switch statement everywhere? +static void applyDelta(std::map* dataMap, Arena& ar, KeyRangeRef keyRange, MutationRef m) { + if (m.type == MutationRef::ClearRange) { + if (m.param2 <= keyRange.begin || m.param1 >= keyRange.end) { + return; + } + // keyRange is inclusive on start, lower_bound is inclusive with the argument, and erase is inclusive for the + // begin. So if lower bound didn't find the exact key, we need to go up one so it doesn't erase an extra key + // outside the range. + std::map::iterator itStart = dataMap->lower_bound(m.param1); + if (itStart != dataMap->end() && itStart->first < m.param1) { + itStart++; + } + + // keyRange is exclusive on end, lower bound is inclusive with the argument, and erase is exclusive for the end + // key. So if lower bound didn't find the exact key, we need to go up one so it doesn't skip the last key it + // should erase + std::map::iterator itEnd = dataMap->lower_bound(m.param2); + if (itEnd != dataMap->end() && itEnd->first < m.param2) { + itEnd++; + } + dataMap->erase(itStart, itEnd); + } else { + if (m.param1 < keyRange.begin || m.param1 >= keyRange.end) { + return; + } + std::map::iterator it = dataMap->find(m.param1); + if (m.type != MutationRef::SetValue) { + Optional oldVal; + if (it != dataMap->end()) { + oldVal = it->second; + } + + switch (m.type) { + case MutationRef::AddValue: + m.param2 = doLittleEndianAdd(oldVal, m.param2, ar); + break; + case MutationRef::And: + m.param2 = doAnd(oldVal, m.param2, ar); + break; + case MutationRef::Or: + m.param2 = doOr(oldVal, m.param2, ar); + break; + case MutationRef::Xor: + m.param2 = doXor(oldVal, m.param2, ar); + break; + case MutationRef::AppendIfFits: + m.param2 = doAppendIfFits(oldVal, m.param2, ar); + break; + case MutationRef::Max: + m.param2 = doMax(oldVal, m.param2, ar); + break; + case MutationRef::Min: + m.param2 = doMin(oldVal, m.param2, ar); + break; + case MutationRef::ByteMin: + m.param2 = doByteMin(oldVal, m.param2, ar); + break; + case MutationRef::ByteMax: + m.param2 = doByteMax(oldVal, m.param2, ar); + break; + case MutationRef::MinV2: + m.param2 = doMinV2(oldVal, m.param2, ar); + break; + case MutationRef::AndV2: + m.param2 = doAndV2(oldVal, m.param2, ar); + break; + case MutationRef::CompareAndClear: + if (oldVal.present() && m.param2 == oldVal.get()) { + m.type = MutationRef::ClearRange; + m.param2 = keyAfter(m.param1, ar); + applyDelta(dataMap, ar, keyRange, m); + }; + return; + } + } + if (it == dataMap->end()) { + dataMap->insert({ m.param1, m.param2 }); + } else { + it->second = m.param2; + } + } +} + +// TODO might want to change this to an actor so it can yield periodically? +static void applyDeltas(std::map* dataMap, + Arena& arena, + GranuleDeltas deltas, + KeyRangeRef keyRange, + Version readVersion) { + for (MutationAndVersion& delta : deltas) { + if (delta.v > readVersion) { + break; + } + applyDelta(dataMap, arena, keyRange, delta.m); + } +} + +ACTOR Future readBlobGranule(BlobGranuleChunk chunk, + KeyRangeRef keyRange, + Version readVersion, + Reference bstore, + std::string bucket) { + // arena to hold all stuff for parsing and updaing mutations. Most of it will likely be tossed, so we copy at the + // end instead of doing a dependsOn + state Arena arena; + + state std::map dataMap; + Future readSnapshotFuture = + readSnapshotFile(bstore, bucket, chunk.snapshotFileName.toString(), arena, keyRange, &dataMap); + state std::vector> readDeltaFutures; + readDeltaFutures.resize(chunk.deltaFileNames.size()); + for (StringRef deltaFileName : chunk.deltaFileNames) { + readDeltaFutures.push_back( + readDeltaFile(bstore, bucket, deltaFileName.toString(), arena, keyRange, readVersion)); + } + + wait(readSnapshotFuture); + for (Future deltaFuture : readDeltaFutures) { + GranuleDeltas result = wait(deltaFuture); + applyDeltas(&dataMap, arena, result, keyRange, readVersion); + wait(yield()); + } + printf("Applying %d memory deltas\n", chunk.newDeltas.size()); + applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion); + wait(yield()); + + RangeResult ret; + for (auto& it : dataMap) { + ret.push_back_deep(ret.arena(), KeyValueRef(it.first, it.second)); + // TODO for large reads, probably wait to yield periodically here for slowTask + } + printf("Final processing ended up with %d rows\n", ret.size()); + + return ret; +} + +// TODO probably should add things like limit/bytelimit at some point? +ACTOR Future readBlobGranules(BlobGranuleFileRequest request, + BlobGranuleFileReply reply, + Reference bstore, + std::string bucket, + PromiseStream results) { + // TODO for large amount of chunks, this should probably have some sort of buffer limit like ReplyPromiseStream. + // Maybe just use ReplyPromiseStream instead of PromiseStream? + try { + state int i; + for (i = 0; i < reply.chunks.size(); i++) { + printf("ReadBlobGranules processing chunk %d [%s - %s)\n", + i, + reply.chunks[i].keyRange.begin.printable().c_str(), + reply.chunks[i].keyRange.end.printable().c_str()); + RangeResult chunkResult = + wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore, bucket)); + results.send(std::move(chunkResult)); + } + printf("ReadBlobGranules done, sending EOS\n"); + results.sendError(end_of_stream()); + } catch (Error& e) { + printf("ReadBlobGranules got error %s\n", e.name()); + results.sendError(e); + } + + return Void(); +} + +TEST_CASE("/blobgranule/reader/applyDelta") { + printf("Testing blob granule deltas\n"); + Arena a; + + // do this 2 phase arena creation of string refs instead of LiteralStringRef because there is no char* StringRef + // constructor, and valgrind might complain if the stringref data isn't in the arena + std::string sk_a = "A"; + std::string sk_ab = "AB"; + std::string sk_b = "B"; + std::string sk_c = "C"; + std::string sk_z = "Z"; + std::string sval1 = "1"; + std::string sval2 = "2"; + + StringRef k_a = StringRef(a, sk_a); + StringRef k_ab = StringRef(a, sk_ab); + StringRef k_b = StringRef(a, sk_b); + StringRef k_c = StringRef(a, sk_c); + StringRef k_z = StringRef(a, sk_z); + StringRef val1 = StringRef(a, sval1); + StringRef val2 = StringRef(a, sval2); + + std::map data; + data.insert({ k_a, val1 }); + data.insert({ k_ab, val1 }); + data.insert({ k_b, val1 }); + + std::map correctData = data; + std::map originalData = data; + + ASSERT(data == correctData); + + // test all clear permutations + + MutationRef mClearEverything(MutationRef::ClearRange, allKeys.begin, allKeys.end); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearEverything); + correctData.clear(); + ASSERT(data == correctData); + + MutationRef mClearEverything2(MutationRef::ClearRange, allKeys.begin, k_c); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearEverything2); + correctData.clear(); + ASSERT(data == correctData); + + MutationRef mClearEverything3(MutationRef::ClearRange, k_a, allKeys.end); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearEverything3); + correctData.clear(); + ASSERT(data == correctData); + + MutationRef mClearEverything4(MutationRef::ClearRange, k_a, k_c); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearEverything4); + correctData.clear(); + ASSERT(data == correctData); + + MutationRef mClearFirst(MutationRef::ClearRange, k_a, k_ab); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearFirst); + correctData.erase(k_a); + ASSERT(data == correctData); + + MutationRef mClearSecond(MutationRef::ClearRange, k_ab, k_b); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearSecond); + correctData.erase(k_ab); + ASSERT(data == correctData); + + MutationRef mClearThird(MutationRef::ClearRange, k_b, k_c); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearThird); + correctData.erase(k_b); + ASSERT(data == correctData); + + MutationRef mClearFirst2(MutationRef::ClearRange, k_a, k_b); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearFirst2); + correctData.erase(k_a); + correctData.erase(k_ab); + ASSERT(data == correctData); + + MutationRef mClearLast2(MutationRef::ClearRange, k_ab, k_c); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mClearLast2); + correctData.erase(k_ab); + correctData.erase(k_b); + ASSERT(data == correctData); + + // test set data + MutationRef mSetA(MutationRef::SetValue, k_a, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mSetA); + correctData[k_a] = val2; + ASSERT(data == correctData); + + MutationRef mSetAB(MutationRef::SetValue, k_ab, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mSetAB); + correctData[k_ab] = val2; + ASSERT(data == correctData); + + MutationRef mSetB(MutationRef::SetValue, k_b, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mSetB); + correctData[k_b] = val2; + ASSERT(data == correctData); + + MutationRef mSetC(MutationRef::SetValue, k_c, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mSetC); + correctData[k_c] = val2; + ASSERT(data == correctData); + + // test pruning deltas that are outside of the key range + + MutationRef mSetZ(MutationRef::SetValue, k_z, val2); + data = originalData; + applyDelta(&data, a, KeyRangeRef(k_a, k_c), mSetZ); + ASSERT(data == originalData); + + applyDelta(&data, a, KeyRangeRef(k_ab, k_c), mSetA); + ASSERT(data == originalData); + + applyDelta(&data, a, KeyRangeRef(k_ab, k_c), mClearFirst); + ASSERT(data == originalData); + + applyDelta(&data, a, KeyRangeRef(k_a, k_ab), mClearThird); + ASSERT(data == originalData); + + // Could test all other atomic ops, but if set, max, and compare+clear works, and the others all just directly call + // the atomics, there is little to test + + MutationRef mCAndC1(MutationRef::CompareAndClear, k_a, val1); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mCAndC1); + correctData.erase(k_a); + ASSERT(data == correctData); + + MutationRef mCAndC2(MutationRef::CompareAndClear, k_a, val2); + data = originalData; + applyDelta(&data, a, allKeys, mCAndC2); + ASSERT(data == originalData); + + MutationRef mCAndCZ(MutationRef::CompareAndClear, k_z, val2); + data = originalData; + applyDelta(&data, a, allKeys, mCAndCZ); + ASSERT(data == originalData); + + MutationRef mMaxA(MutationRef::ByteMax, k_a, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mMaxA); + correctData[k_a] = val2; + ASSERT(data == correctData); + + MutationRef mMaxC(MutationRef::ByteMax, k_c, val2); + data = originalData; + correctData = originalData; + applyDelta(&data, a, allKeys, mMaxC); + correctData[k_c] = val2; + ASSERT(data == correctData); + + return Void(); +} \ No newline at end of file diff --git a/fdbclient/BlobGranuleReader.actor.h b/fdbclient/BlobGranuleReader.actor.h new file mode 100644 index 0000000000..15b799b4c7 --- /dev/null +++ b/fdbclient/BlobGranuleReader.actor.h @@ -0,0 +1,51 @@ +/* + * BlobGranuleReader.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source +// version. +#if defined(NO_INTELLISENSE) && !defined(BLOB_GRANULE_READER_CLIENT_G_H) +#define BLOB_GRANULE_READER_CLIENT_G_H +#include "fdbclient/BlobGranuleReader.actor.g.h" +#elif !defined(BLOB_GRANULE_READER_CLIENT_H) +#define BLOB_GRANULE_READER_CLIENT_H + +#include "fdbclient/BlobWorkerInterface.h" +#include "fdbclient/S3BlobStore.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +// Reads the fileset in the reply using the provided blob store, amnd filters data and mutations by key + version from +// the request +ACTOR Future readBlobGranule(BlobGranuleChunk chunk, + KeyRangeRef keyRange, + Version readVersion, + Reference bstore, + std::string bucket); + +ACTOR Future readBlobGranules(BlobGranuleFileRequest request, + BlobGranuleFileReply reply, + Reference bstore, + std::string bucket, + PromiseStream results); + +#include "flow/unactorcompiler.h" +#endif diff --git a/fdbclient/BlobWorkerInterface.h b/fdbclient/BlobWorkerInterface.h new file mode 100644 index 0000000000..4d6ebe913b --- /dev/null +++ b/fdbclient/BlobWorkerInterface.h @@ -0,0 +1,134 @@ +/* + * BlobWorkerInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_BLOBWORKERINTERFACE_H +#define FDBCLIENT_BLOBWORKERINTERFACE_H +#pragma once + +#include "fdbclient/CommitProxyInterface.h" +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbrpc/Locality.h" + +struct BlobWorkerInterface { + constexpr static FileIdentifier file_identifier = 8358753; + RequestStream> waitFailure; + RequestStream blobGranuleFileRequest; + struct LocalityData locality; + UID myId; + + BlobWorkerInterface() {} + explicit BlobWorkerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {} + + void initEndpoints() {} + UID id() const { return myId; } + NetworkAddress address() const { return blobGranuleFileRequest.getEndpoint().getPrimaryAddress(); } + bool operator==(const BlobWorkerInterface& r) const { return id() == r.id(); } + bool operator!=(const BlobWorkerInterface& r) const { return !(*this == r); } + + template + void serialize(Archive& ar) { + serializer(ar, waitFailure, blobGranuleFileRequest, locality, myId); + } +}; + +struct MutationAndVersion { + constexpr static FileIdentifier file_identifier = 4268041; + MutationRef m; + Version v; + + MutationAndVersion() {} + MutationAndVersion(Arena& to, MutationRef m, Version v) : m(to, m), v(v) {} + MutationAndVersion(Arena& to, const MutationAndVersion& from) : m(to, from.m), v(from.v) {} + + template + void serialize(Ar& ar) { + serializer(ar, m, v); + } +}; + +// file format of actual blob files +struct GranuleSnapshot : VectorRef { + + constexpr static FileIdentifier file_identifier = 4268040; + + template + void serialize(Ar& ar) { + serializer(ar, ((VectorRef&)*this)); + } +}; + +struct GranuleDeltas : VectorRef { + constexpr static FileIdentifier file_identifier = 4268042; + + template + void serialize(Ar& ar) { + serializer(ar, ((VectorRef&)*this)); + } +}; + +// the assumption of this response is that the client will deserialize the files and apply the mutations themselves +// TODO could filter out delta files that don't intersect the key range being requested? +// TODO since client request passes version, we don't need to include the version of each mutation in the response if we +// pruned it there +struct BlobGranuleChunk { + constexpr static FileIdentifier file_identifier = 991434; + KeyRangeRef keyRange; + StringRef snapshotFileName; + VectorRef deltaFileNames; + GranuleDeltas newDeltas; + + template + void serialize(Ar& ar) { + serializer(ar, keyRange, snapshotFileName, deltaFileNames, newDeltas); + } +}; + +struct BlobGranuleFileReply { + // TODO is there a "proper" way to generate file_identifier? + constexpr static FileIdentifier file_identifier = 6858612; + Arena arena; + VectorRef chunks; + + template + void serialize(Ar& ar) { + serializer(ar, chunks, arena); + } +}; + +// TODO could do a reply promise stream of file mutations to bound memory requirements? +// Have to load whole snapshot file into memory though so it doesn't actually matter too much +struct BlobGranuleFileRequest { + + constexpr static FileIdentifier file_identifier = 4150141; + Arena arena; + KeyRangeRef keyRange; + Version readVersion; + ReplyPromise reply; + + BlobGranuleFileRequest() {} + + template + void serialize(Ar& ar) { + serializer(ar, keyRange, readVersion, reply, arena); + } +}; + +#endif \ No newline at end of file diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index 7f51be87ba..1f5dbfeb9e 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -17,6 +17,9 @@ set(FDBCLIENT_SRCS BackupContainerS3BlobStore.h ClientBooleanParams.cpp ClientBooleanParams.h + BlobWorkerInterface.h + BlobGranuleReader.actor.cpp + BlobGranuleReader.actor.h ClientKnobCollection.cpp ClientKnobCollection.h ClientKnobs.cpp diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 7cace4ca21..78d82e703b 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1039,10 +1039,53 @@ const KeyRangeRef configClassKeys("\xff\xff/configClasses/"_sr, "\xff\xff/config const KeyRef blobRangeChangeKey = LiteralStringRef("\xff\x02/blobRangeChange"); const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff/blobRange/"), LiteralStringRef("\xff/blobRange0")); -// TODO need something for tracking "current" granules - // blob range file data -const KeyRangeRef blobGranuleKeys(LiteralStringRef("\xff/blobGranules/"), LiteralStringRef("\xff/blobGranules0")); +const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff/blobGranuleFiles/"), + LiteralStringRef("\xff/blobGranuleFiles0")); + +const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff/blobGranuleMapping/"), + LiteralStringRef("\xff/blobGranuleMapping0")); + +const Value blobGranuleMappingValueFor(UID const& workerID) { + BinaryWriter wr(Unversioned()); + wr << workerID; + return wr.toValue(); +} + +UID decodeBlobGranuleMappingValue(ValueRef const& value) { + UID workerID; + BinaryReader reader(value, Unversioned()); + reader >> workerID; + return workerID; +} + +const KeyRangeRef blobWorkerListKeys(LiteralStringRef("\xff/blobWorkerList/"), + LiteralStringRef("\xff/blobWorkerList0")); + +const Key blobWorkerListKeyFor(UID workerID) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes(blobWorkerListKeys.begin); + wr << workerID; + return wr.toValue(); +} + +const Value blobWorkerListValue(BlobWorkerInterface const& worker) { + return ObjectWriter::toValue(worker, IncludeVersion()); +} + +UID decodeBlobWorkerListKey(KeyRef const& key) { + UID workerID; + BinaryReader reader(key.removePrefix(blobWorkerListKeys.begin), Unversioned()); + reader >> workerID; + return workerID; +} + +BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) { + BlobWorkerInterface interf; + ObjectReader reader(value.begin(), IncludeVersion()); + reader.deserialize(interf); + return interf; +} // for tests void testSSISerdes(StorageServerInterface const& ssi, bool useFB) { diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 883797990f..9755a3d541 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -25,6 +25,7 @@ // Functions and constants documenting the organization of the reserved keyspace in the database beginning with "\xFF" #include "fdbclient/FDBTypes.h" +#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h #include "fdbclient/StorageServerInterface.h" // Don't warn on constants being defined in this file. @@ -507,8 +508,26 @@ extern const KeyRangeRef blobRangeKeys; // blob granule keys -// \xff/blobGranule/(startKey, endKey, {snapshot,delta}, version) = [[filename]] -extern const KeyRangeRef blobGranuleKeys; +// TODO probably want to add key range of file here so we can prune requests +// TODO this also technically means clients don't have to go through the blob worker for time-travel reads if they're +// older than the last delta file. +// \xff/blobGranuleFiles/(startKey, endKey, {snapshot|delta}, version) = [[filename]] +extern const KeyRangeRef blobGranuleFileKeys; + +// TODO could shrink the size of this keyspace by using something similar to tags instead of UIDs +// \xff/blobGranuleMapping/[[begin]] = [[BlobWorkerID]] +extern const KeyRangeRef blobGranuleMappingKeys; + +const Value blobGranuleMappingValueFor(UID const& workerID); +UID decodeBlobGranuleMappingValue(ValueRef const& value); + +// \xff/blobWorkerList/[[BlobWorkerID]] = [[BlobWorkerInterface]] +extern const KeyRangeRef blobWorkerListKeys; + +const Key blobWorkerListKeyFor(UID workerID); +const Value blobWorkerListValue(BlobWorkerInterface const& interface); +UID decodeBlobWorkerListKey(KeyRef const& key); +BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value); #pragma clang diagnostic pop diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index b4abe89e9b..30428de96e 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -42,6 +42,8 @@ #include "fdbserver/Status.h" #include "fdbserver/LatencyBandConfig.h" #include "fdbclient/DatabaseContext.h" +#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE +#include "fdbclient/BlobGranuleReader.actor.h" // TODO REMOVE #include "fdbclient/GlobalConfig.actor.h" #include "fdbserver/RecoveryState.h" #include "fdbclient/ReadYourWrites.h" @@ -3089,43 +3091,134 @@ public: // TODO REMOVE!!!! ACTOR Future doBlobGranuleRequests(ClusterControllerData* self, RatekeeperInterface interf) { + state std::string bucket = SERVER_KNOBS->BG_BUCKET; + state Reference bstore; + + printf("Initializing CC s3 stuff\n"); + try { + printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str()); + bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL); + printf("checking if bucket %s exists\n", bucket.c_str()); + bool bExists = wait(bstore->bucketExists(bucket)); + if (!bExists) { + printf("Bucket %s does not exist!\n", bucket.c_str()); + return Void(); + } + } catch (Error& e) { + printf("CC got s3 init error %s\n", e.name()); + return Void(); + } + + state std::unordered_map workerInterfaceCache; + state int i; loop { try { state Reference tr = makeReference(self->cx); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02"))); state Version v = wait(tr->getReadVersion()); if (deterministicRandom()->random01() < 0.3) { v -= 5000000; } else if (deterministicRandom()->random01() < 0.3) { v -= 30000000; } - printf("Doing blob granule request @ %lld\n", v); - BlobGranuleFileRequest req; - req.keyRange = KeyRangeRef(StringRef(req.arena, allKeys.begin), StringRef(req.arena, allKeys.end)); - req.readVersion = v; - BlobGranuleFileReply rep = wait(interf.blobGranuleFileRequest.getReply(req)); - printf("Blob granule request got reply:\n"); - for (auto& chunk : rep.chunks) { - printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str()); - printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str()); - printf(" DeltaFiles:\n"); - for (auto& df : chunk.deltaFileNames) { - printf(" %s\n", df.toString().c_str()); - } - printf(" Deltas: (%d)", chunk.newDeltas.size()); - if (chunk.newDeltas.size() > 0) { - printf(" with version [%lld - %lld]", - chunk.newDeltas[0].v, - chunk.newDeltas[chunk.newDeltas.size() - 1].v); - } - printf("\n\n"); + state RangeResult blobGranuleMapping = wait( + krmGetRanges(tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); + ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY); + + if (blobGranuleMapping.size() == 0) { + printf("no blob worker assignments yet \n"); + throw transaction_too_old(); } + printf("Doing blob granule request @ %lld\n", v); + + printf("blob worker assignments:\n"); + for (i = 0; i < blobGranuleMapping.size() - 1; i++) { + state Key granuleStartKey = blobGranuleMapping[i].key; + state Key granuleEndKey = blobGranuleMapping[i + 1].key; + if (!blobGranuleMapping[i].value.size()) { + printf("Key range [%s - %s) missing worker assignment!\n", + granuleStartKey.printable().c_str(), + granuleEndKey.printable().c_str()); + // TODO probably new exception type instead + throw transaction_too_old(); + } + + state UID workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value); + printf(" [%s - %s): %s\n", + granuleStartKey.printable().c_str(), + granuleEndKey.printable().c_str(), + workerId.toString().c_str()); + + if (i == 0) { + granuleStartKey = keyRange.begin; + } + if (i == blobGranuleMapping.size() - 2) { + granuleEndKey = keyRange.end; + } + + if (!workerInterfaceCache.count(workerId)) { + Optional workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId))); + ASSERT(workerInterface.present()); + workerInterfaceCache[workerId] = decodeBlobWorkerListValue(workerInterface.get()); + printf(" decoded worker interface for %s\n", workerId.toString().c_str()); + } + + state BlobGranuleFileRequest req; + req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey)); + req.readVersion = v; + ErrorOr _rep = + wait(workerInterfaceCache[workerId].blobGranuleFileRequest.tryGetReply(req)); + BlobGranuleFileReply rep = _rep.get(); + printf("Blob granule request for [%s - %s) @ %lld got reply from %s:\n", + granuleStartKey.printable().c_str(), + granuleEndKey.printable().c_str(), + v, + workerId.toString().c_str()); + for (auto& chunk : rep.chunks) { + printf("[%s - %s)\n", + chunk.keyRange.begin.printable().c_str(), + chunk.keyRange.end.printable().c_str()); + + printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str()); + printf(" DeltaFiles:\n"); + for (auto& df : chunk.deltaFileNames) { + printf(" %s\n", df.toString().c_str()); + } + printf(" Deltas: (%d)", chunk.newDeltas.size()); + if (chunk.newDeltas.size() > 0) { + printf(" with version [%lld - %lld]", + chunk.newDeltas[0].v, + chunk.newDeltas[chunk.newDeltas.size() - 1].v); + } + printf("\n\n"); + } + state PromiseStream results; + state Future granuleReader = readBlobGranules(req, rep, bstore, bucket, results); + try { + loop { + printf("Waiting for result chunk\n"); + RangeResult result = waitNext(results.getFuture()); + printf("Result chunk (%d):\n", result.size()); + for (auto& it : result) { + printf(" %s=%s\n", it.key.printable().c_str(), it.value.printable().c_str()); + } + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + printf("granule reader got unexpected error %s\n", e.name()); + } else { + printf("granule reader got end of stream\n"); + } + } + } } catch (Error& e) { printf("blob granule file request got error %s\n", e.name()); } - wait(delay(5.0)); + wait(delay(5.0 + 5.0 * deterministicRandom()->random01())); } } diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 9c540bfb1b..e2f9bdfca9 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -27,6 +27,7 @@ #include "fdbclient/Tuple.h" // TODO REMOVE #include "fdbclient/S3BlobStore.h" // TODO REMOVE #include "fdbclient/AsyncFileS3BlobStore.actor.h" // TODO REMOVE +#include "fdbclient/BlobWorkerInterface.h" // TODO REMOVE #include "fdbclient/TagThrottle.h" #include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" @@ -543,26 +544,6 @@ struct GrvProxyInfo { } }; -// TODO MOVE elsewhere -struct GranuleSnapshot : VectorRef { - - constexpr static FileIdentifier file_identifier = 4268040; - - template - void serialize(Ar& ar) { - serializer(ar, ((VectorRef&)*this)); - } -}; - -struct GranuleDeltas : VectorRef { - constexpr static FileIdentifier file_identifier = 4268042; - - template - void serialize(Ar& ar) { - serializer(ar, ((VectorRef&)*this)); - } -}; - // TODO make refcounted so it can go in RatekeeperData and the blob worker struct GranuleMetadata : NonCopyable, ReferenceCounted { std::deque> snapshotFiles; @@ -1446,6 +1427,7 @@ ACTOR Future configurationMonitor(RatekeeperData* self) { } } +// TODO add granule locks ACTOR Future> writeDeltaFile(RatekeeperData* self, Reference bstore, std::string bucket, @@ -1479,7 +1461,7 @@ ACTOR Future> writeDeltaFile(RatekeeperData* sel Tuple deltaFileKey; deltaFileKey.append(startKey).append(endKey); deltaFileKey.append(LiteralStringRef("delta")).append(lastVersion); - tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname); + tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname); wait(tr->commit()); printf("blob worker updated fdb with delta file %s at version %lld\n", fname.c_str(), lastVersion); @@ -1544,7 +1526,7 @@ ACTOR Future> dumpSnapshotFromFDB(RatekeeperData Tuple snapshotFileKey; snapshotFileKey.append(startKey).append(endKey); snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion); - tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname); + tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname); wait(tr->commit()); printf("Blob worker committed new snapshot file for range\n"); return std::pair(readVersion, fname); @@ -1561,22 +1543,17 @@ ACTOR Future> dumpSnapshotFromFDB(RatekeeperData } } -// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure -ACTOR Future blobWorker(RatekeeperData* self, - Reference metadata, - Reference bstore, - std::string bucket, - PromiseStream rangeFeed, - PromiseStream snapshotVersions, // update fake range feed with actual versions - Key startKey, - Key endKey) { - state Arena deltaArena; - printf("Blob worker starting for [%s - %s) for bucket %s\n", - startKey.printable().c_str(), - endKey.printable().c_str(), - bucket.c_str()); +ACTOR Future blobWorkerUpdateFiles( + RatekeeperData* self, + Reference metadata, + Reference bstore, + std::string bucket, + PromiseStream rangeFeed, + PromiseStream snapshotVersions, // update fake range feed with actual versions + Key startKey, + Key endKey) { - // dump snapshot at the start + state Arena deltaArena; std::pair newSnapshotFile = wait(dumpSnapshotFromFDB(self, bstore, bucket, startKey, endKey)); metadata->snapshotFiles.push_back(newSnapshotFile); metadata->lastWriteVersion = newSnapshotFile.first; @@ -1623,142 +1600,6 @@ ACTOR Future blobWorker(RatekeeperData* self, } } -// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions -ACTOR Future fakeRangeFeed(PromiseStream mutationStream, - PromiseStream snapshotVersions, - Key startKey, - Key endKey) { - state Version version = waitNext(snapshotVersions.getFuture()); - state uint32_t targetKbPerSec = (uint32_t)(SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES) / 10; - state Arena arena; - printf("Fake range feed got initial version %lld\n", version); - loop { - // dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions - state uint32_t bytesGenerated = 0; - state uint32_t targetKbThisSec = - targetKbPerSec / 2 + (uint32_t)(deterministicRandom()->random01() * targetKbPerSec); - state uint32_t mutationsGenerated = 0; - while (bytesGenerated < targetKbThisSec) { - MutationAndVersion update; - update.v = version; - update.m.param1 = startKey; - update.m.param2 = startKey; - if (deterministicRandom()->random01() < 0.5) { - // clear start key - update.m.type = MutationRef::Type::ClearRange; - } else { - // set - update.m.type = MutationRef::Type::SetValue; - } - mutationsGenerated++; - bytesGenerated += 17 + 2 * startKey.size(); - mutationStream.send(update); - - // simulate multiple mutations with same version (TODO: this should be possible right) - if (deterministicRandom()->random01() < 0.4) { - version++; - } - if (mutationsGenerated % 1000 == 0) { - wait(yield()); - } - } - - printf("Fake range feed generated %d mutations at version %lld\n", mutationsGenerated, version); - - choose { - when(wait(delay(1.0))) { - // slightly slower than real versions, to try to ensure it doesn't get ahead - version += 950000; - } - when(Version _v = waitNext(snapshotVersions.getFuture())) { - if (_v > version) { - printf("updating fake range feed from %lld to snapshot version %lld\n", version, _v); - version = _v; - } else { - printf("snapshot version %lld was ahead of fake range feed version %lld, keeping fake version\n", - _v, - version); - } - } - } - } -} - -// TODO MOVE ELSEWHERE -ACTOR Future blobManagerPoc(RatekeeperData* self) { - state Future currentWorker; - state Future currentRangeFeed; - state Reference bstore; - state std::string bucket = SERVER_KNOBS->BG_BUCKET; - - printf("Initializing blob manager s3 stuff\n"); - try { - printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str()); - bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL); - printf("checking if bucket %s exists\n", bucket.c_str()); - bool bExists = wait(bstore->bucketExists(bucket)); - if (!bExists) { - printf("Bucket %s does not exist!\n", bucket.c_str()); - return Void(); - } - } catch (Error& e) { - printf("Blob manager got s3 init error %s\n", e.name()); - return Void(); - } - - loop { - state Reference tr = makeReference(self->db); - - printf("Blob manager checking for range updates\n"); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - // TODO probably knobs here? This should always be pretty small though - RangeResult results = wait(krmGetRanges( - tr, blobRangeKeys.begin, KeyRange(allKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); - ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - - bool foundRange = false; - for (int i = 0; i < results.size() - 1; i++) { - if (results[i].value == LiteralStringRef("1")) { - ASSERT(!foundRange); // TODO for now only assume at most 1 range is set - ASSERT(results[i + 1].value == StringRef()); - printf("Blob manager sees range [%s - %s)\n", - results[i].key.printable().c_str(), - results[i + 1].key.printable().c_str()); - - // TODO better way to make standalone to copy the keys into? - self->granuleRange = KeyRangeRef(results[i].key, results[i + 1].key); - self->granuleMetadata = makeReference(); - foundRange = true; - PromiseStream mutationStream; - PromiseStream snapshotVersions; - currentRangeFeed = - fakeRangeFeed(mutationStream, snapshotVersions, results[i].key, results[i + 1].key); - currentWorker = blobWorker(self, - self->granuleMetadata, - bstore, - bucket, - mutationStream, - snapshotVersions, - results[i].key, - results[i + 1].key); - } - } - - state Future watchFuture = tr->watch(blobRangeChangeKey); - wait(tr->commit()); - wait(watchFuture); - break; - } catch (Error& e) { - wait(tr->onError(e)); - } - } - } -} - static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranuleFileRequest& req) { if (!self->granuleMetadata.isValid()) { printf("No blob granule found, skipping request\n"); @@ -1766,7 +1607,7 @@ static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranule return; } BlobGranuleFileReply rep; - // TODO could check for intersection, but just assume it intersects for now + // TODO could check for intersection, but just assume it intersects all files+mutations for now // copy everything into reply's arena BlobGranuleChunk chunk; @@ -1820,6 +1661,258 @@ static void handleBlobGranuleFileRequest(RatekeeperData* self, const BlobGranule req.reply.send(rep); } +// TODO list of key ranges in the future +ACTOR Future registerBlobWorker(RatekeeperData* self, BlobWorkerInterface interf, KeyRange keyRange) { + state Reference tr = makeReference(self->db); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + loop { + try { + Key blobWorkerListKey = blobWorkerListKeyFor(interf.id()); + tr->addReadConflictRange(singleKeyRange(blobWorkerListKey)); + tr->set(blobWorkerListKey, blobWorkerListValue(interf)); + + wait(krmSetRangeCoalescing(tr, + blobGranuleMappingKeys.begin, + keyRange, + KeyRange(allKeys), + blobGranuleMappingValueFor(interf.id()))); + + wait(tr->commit()); + + printf("Registered blob worker %s for key range [%s - %s)\n", + interf.id().toString().c_str(), + keyRange.begin.printable().c_str(), + keyRange.end.printable().c_str()); + return Void(); + } catch (Error& e) { + printf("Registering blob worker %s for key range [%s - %s) got error %s\n", + interf.id().toString().c_str(), + keyRange.begin.printable().c_str(), + keyRange.end.printable().c_str(), + e.name()); + wait(tr->onError(e)); + } + } +} + +// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure? +ACTOR Future blobWorkerCore(RatekeeperData* self, + Reference metadata, + Reference bstore, + std::string bucket, + PromiseStream rangeFeed, + PromiseStream snapshotVersions, + Key startKey, + Key endKey, + LocalityData locality) { + printf("Blob worker starting for [%s - %s) for bucket %s\n", + startKey.printable().c_str(), + endKey.printable().c_str(), + bucket.c_str()); + + state BlobWorkerInterface interf(locality, deterministicRandom()->randomUniqueID()); + interf.initEndpoints(); + + wait(registerBlobWorker(self, interf, KeyRange(KeyRangeRef(startKey, endKey)))); + + state PromiseStream> addActor; + state Future collection = actorCollection(addActor.getFuture()); + + addActor.send(waitFailureServer(interf.waitFailure.getFuture())); + // start file updater + addActor.send(blobWorkerUpdateFiles(self, metadata, bstore, bucket, rangeFeed, snapshotVersions, startKey, endKey)); + + try { + loop choose { + when(BlobGranuleFileRequest req = waitNext(interf.blobGranuleFileRequest.getFuture())) { + printf("Got blob granule request [%s - %s)\n", + req.keyRange.begin.printable().c_str(), + req.keyRange.end.printable().c_str()); + handleBlobGranuleFileRequest(self, req); + } + when(wait(collection)) { + ASSERT(false); + throw internal_error(); + } + } + } catch (Error& e) { + printf("Blob worker got error %s, exiting\n", e.name()); + TraceEvent("BlobWorkerDied", interf.id()).error(e, true); + } + + return Void(); +} + +// dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions +ACTOR Future fakeRangeFeed(PromiseStream mutationStream, + PromiseStream snapshotVersions, + Key startKey, + Key endKey) { + state Version version = waitNext(snapshotVersions.getFuture()); + state uint32_t targetKbPerSec = (uint32_t)(SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES) / 10; + state Arena arena; + printf("Fake range feed got initial version %lld\n", version); + loop { + // dumb series of mutations that just sets/clears same key that is unrelated to actual db transactions + state uint32_t bytesGenerated = 0; + state uint32_t targetKbThisSec = + targetKbPerSec / 2 + (uint32_t)(deterministicRandom()->random01() * targetKbPerSec); + state uint32_t mutationsGenerated = 0; + while (bytesGenerated < targetKbThisSec) { + MutationAndVersion update; + update.v = version; + update.m.param1 = startKey; + update.m.param2 = startKey; + if (deterministicRandom()->random01() < 0.5) { + // clear start key + update.m.type = MutationRef::Type::ClearRange; + } else { + // set + update.m.type = MutationRef::Type::SetValue; + } + mutationsGenerated++; + bytesGenerated += 17 + 2 * startKey.size(); + mutationStream.send(update); + + // simulate multiple mutations with same version (TODO: this should be possible right) + if (deterministicRandom()->random01() < 0.4) { + version++; + } + if (mutationsGenerated % 1000 == 0) { + wait(yield()); + } + } + + // printf("Fake range feed generated %d mutations at version %lld\n", mutationsGenerated, version); + + choose { + when(wait(delay(1.0))) { + // slightly slower than real versions, to try to ensure it doesn't get ahead + version += 950000; + } + when(Version _v = waitNext(snapshotVersions.getFuture())) { + if (_v > version) { + printf("updating fake range feed from %lld to snapshot version %lld\n", version, _v); + version = _v; + } else { + printf("snapshot version %lld was ahead of fake range feed version %lld, keeping fake version\n", + _v, + version); + } + } + } + } +} + +// TODO REMOVE eventually +ACTOR Future nukeBlobWorkerData(RatekeeperData* self) { + state Reference tr = makeReference(self->db); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + loop { + try { + tr->clear(blobWorkerListKeys); + tr->clear(blobGranuleMappingKeys); + + return Void(); + } catch (Error& e) { + printf("Nuking blob worker data got error %s\n", e.name()); + wait(tr->onError(e)); + } + } +} + +// TODO MOVE ELSEWHERE +ACTOR Future blobManagerPoc(RatekeeperData* self, LocalityData locality) { + state Future currentWorker; + state Future currentRangeFeed; + state Reference bstore; + state std::string bucket = SERVER_KNOBS->BG_BUCKET; + + printf("Initializing blob manager s3 stuff\n"); + try { + printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str()); + bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL); + printf("checking if bucket %s exists\n", bucket.c_str()); + bool bExists = wait(bstore->bucketExists(bucket)); + if (!bExists) { + printf("Bucket %s does not exist!\n", bucket.c_str()); + return Void(); + } + } catch (Error& e) { + printf("Blob manager got s3 init error %s\n", e.name()); + return Void(); + } + + // TODO remove once we have persistence + failure detection + // TODO move to separate actor + printf("Blob manager nuking previous workers and range assignments on startup\n"); + wait(nukeBlobWorkerData(self)); + printf("Blob manager nuked previous workers and range assignments\n"); + + loop { + state Reference tr = makeReference(self->db); + + printf("Blob manager checking for range updates\n"); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + // TODO probably knobs here? This should always be pretty small though + RangeResult results = wait(krmGetRanges( + tr, blobRangeKeys.begin, KeyRange(allKeys), 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); + ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); + + bool foundRange = false; + for (int i = 0; i < results.size() - 1; i++) { + if (results[i].value == LiteralStringRef("1")) { + ASSERT(!foundRange); // TODO for now only assume at most 1 range is set + ASSERT(results[i + 1].value == StringRef()); + printf("Blob manager sees range [%s - %s)\n", + results[i].key.printable().c_str(), + results[i + 1].key.printable().c_str()); + + // TODO better way to make standalone to copy the keys into? + self->granuleRange = KeyRangeRef(results[i].key, results[i + 1].key); + self->granuleMetadata = makeReference(); + foundRange = true; + PromiseStream mutationStream; + PromiseStream snapshotVersions; + currentRangeFeed = + fakeRangeFeed(mutationStream, snapshotVersions, results[i].key, results[i + 1].key); + currentWorker = blobWorkerCore(self, + self->granuleMetadata, + bstore, + bucket, + mutationStream, + snapshotVersions, + results[i].key, + results[i + 1].key, + locality); + } + } + if (!foundRange) { + // cancel current one + printf("Blob manager found zero blob ranges, cancelling current worker and fake range feed!\n"); + currentRangeFeed = Never(); + currentWorker = Never(); + } + + state Future watchFuture = tr->watch(blobRangeChangeKey); + wait(tr->commit()); + printf("Blob manager awaiting new range update\n"); + wait(watchFuture); + break; + } catch (Error& e) { + printf("Blob manager got error looking for range updates %s\n", e.name()); + wait(tr->onError(e)); + } + } + } +} + ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference> dbInfo) { state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); state Future timeout = Void(); @@ -1849,7 +1942,7 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceBG_URL.c_str(), SERVER_KNOBS->BG_BUCKET.c_str()); - self.addActor.send(blobManagerPoc(&self)); + self.addActor.send(blobManagerPoc(&self, rkInterf.locality)); } TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) @@ -1944,12 +2037,6 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceonChange())) { if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) { diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index ba0b15abb8..7c83a0f90c 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -32,8 +32,6 @@ struct RatekeeperInterface { RequestStream getRateInfo; RequestStream haltRatekeeper; RequestStream reportCommitCostEstimation; - // TODO REMOVE!!! - RequestStream blobGranuleFileRequest; struct LocalityData locality; UID myId; @@ -48,14 +46,7 @@ struct RatekeeperInterface { template void serialize(Archive& ar) { - serializer(ar, - waitFailure, - getRateInfo, - haltRatekeeper, - reportCommitCostEstimation, - blobGranuleFileRequest, - locality, - myId); + serializer(ar, waitFailure, getRateInfo, haltRatekeeper, reportCommitCostEstimation, locality, myId); } }; @@ -149,63 +140,4 @@ struct ReportCommitCostEstimationRequest { } }; -// TODO MOVE ELSEWHERE -struct MutationAndVersion { - constexpr static FileIdentifier file_identifier = 4268041; - MutationRef m; - Version v; - - MutationAndVersion() {} - MutationAndVersion(Arena& to, MutationRef m, Version v) : m(to, m), v(v) {} - MutationAndVersion(Arena& to, const MutationAndVersion& from) : m(to, from.m), v(from.v) {} - - template - void serialize(Ar& ar) { - serializer(ar, m, v); - } -}; - -// the assumption of this response is -struct BlobGranuleChunk { - KeyRangeRef keyRange; - StringRef snapshotFileName; - VectorRef deltaFileNames; - VectorRef newDeltas; - - template - void serialize(Ar& ar) { - serializer(ar, keyRange, snapshotFileName, deltaFileNames, newDeltas); - } -}; - -struct BlobGranuleFileReply { - // TODO "proper" way to generate file_identifier? - constexpr static FileIdentifier file_identifier = 6858612; - Arena arena; - VectorRef chunks; - - template - void serialize(Ar& ar) { - serializer(ar, chunks, arena); - } -}; - -// TODO could do a reply promise stream of file mutations to bound memory requirements? -// Have to load whole snapshot file into memory though so it doesn't actually matter too much -struct BlobGranuleFileRequest { - - constexpr static FileIdentifier file_identifier = 4150141; - Arena arena; - KeyRangeRef keyRange; - Version readVersion; - ReplyPromise reply; - - BlobGranuleFileRequest() {} - - template - void serialize(Ar& ar) { - serializer(ar, keyRange, readVersion, reply, arena); - } -}; - #endif // FDBSERVER_RATEKEEPERINTERFACE_H diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3f454e79f4..8bcb19ae63 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2110,13 +2110,13 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion"); //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", //"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) { // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", - //shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", + // shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); throw wrong_shard_server(); } @@ -2193,10 +2193,10 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe } /*for( int i = 0; i < r.data.size(); i++ ) { - StorageMetrics m; - m.bytesPerKSecond = r.data[i].expectedSize(); - m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int - data->metrics.notify(r.data[i].key, m); + StorageMetrics m; + m.bytesPerKSecond = r.data[i].expectedSize(); + m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an + int data->metrics.notify(r.data[i].key, m); }*/ // For performance concerns, the cost of a range read is billed to the start key and end key of the diff --git a/tests/BlobGranuleReaderUnit.txt b/tests/BlobGranuleReaderUnit.txt new file mode 100644 index 0000000000..efdeffe085 --- /dev/null +++ b/tests/BlobGranuleReaderUnit.txt @@ -0,0 +1,7 @@ +testTitle=UnitTests +startDelay=0 +useDB=false + + testName=UnitTests + maxTestCases=0 + testsMatching=/blobgranule/ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2105391eef..d391159185 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -43,6 +43,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES BackupContainers.txt IGNORE) add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE) add_fdb_test(TEST_FILES BigInsert.txt IGNORE) + add_fdb_test(TEST_FILES BlobGranuleReaderUnit.txt) # TODO re-ignore! add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE) add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE) add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)