Blob worker handles multiple ranges now

This commit is contained in:
Josh Slocum 2021-08-04 16:47:18 -05:00
parent e5b0cf20aa
commit 247602753d
12 changed files with 1278 additions and 368 deletions

View File

@ -1886,7 +1886,7 @@ ACTOR Future<Void> setBlobRange(Database db, Key startKey, Key endKey, Value val
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(krmSetRangeCoalescing(
tr, blobRangeKeys.begin, KeyRange(KeyRangeRef(startKey, endKey)), KeyRange(allKeys), value));
tr, blobRangeKeys.begin, KeyRange(KeyRangeRef(startKey, endKey)), KeyRange(normalKeys), value));
wait(tr->commit());
printf("Successfully updated blob range [%s - %s) to %s\n",
startKey.printable().c_str(),

View File

@ -29,64 +29,127 @@
// sorted thing could work. And if it used arenas it'd probably be more efficient with allocations, since everything
// else is in 1 arena and discarded at the end.
// TODO could refactor the like 6 lines of file reading code from here and the delta file function into another actor,
// TODO could refactor the file reading code from here and the delta file function into another actor,
// then this part would also be testable? but meh
ACTOR Future<Void> readSnapshotFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
Arena arena,
KeyRangeRef keyRange,
std::map<KeyRef, ValueRef>* dataMap) {
state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename);
state int64_t size = wait(reader.size());
state uint8_t* data = new (arena) uint8_t[size];
printf("Reading %d bytes from snapshot file %s\n", size, filename.c_str());
int readSize = wait(reader.read(data, size, 0));
printf("Read %d bytes from snapshot file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
StringRef dataRef(data, size);
ACTOR Future<Arena> readSnapshotFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
KeyRangeRef keyRange,
std::map<KeyRef, ValueRef>* dataMap) {
try {
state Arena arena;
printf("Starting read of snapshot file %s\n", filename.c_str());
state Reference<AsyncFileS3BlobStoreRead> reader =
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, filename);
state int64_t size = wait(reader->size());
printf("Got snapshot file size %lld\n", size);
state uint8_t* data = new (arena) uint8_t[size];
printf("Reading %lld bytes from snapshot file %s\n", size, filename.c_str());
int readSize = wait(reader->read(data, size, 0));
printf("Read %lld bytes from snapshot file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
GranuleSnapshot snapshot = ObjectReader::fromStringRef<GranuleSnapshot>(dataRef, Unversioned());
printf("Parsed %d rows from snapshot file %s\n", snapshot.size(), filename.c_str());
int i = 0;
while (i < snapshot.size() && snapshot[i].key < keyRange.begin) {
i++;
// weird stuff for deserializing vector and arenas
Arena parseArena;
GranuleSnapshot snapshot;
StringRef dataRef(data, size);
ArenaObjectReader rdr(arena, dataRef, Unversioned());
rdr.deserialize(FileIdentifierFor<GranuleSnapshot>::value, snapshot, parseArena);
arena.dependsOn(parseArena);
// GranuleSnapshot snapshot = ObjectReader::fromStringRef<GranuleSnapshot>(dataRef, Unversioned();)
printf("Parsed %d rows from snapshot file %s\n", snapshot.size(), filename.c_str());
// TODO REMOVE sanity check eventually
for (int i = 0; i < snapshot.size() - 1; i++) {
if (snapshot[i].key >= snapshot[i + 1].key) {
printf("BG SORT ORDER VIOLATION IN SNAPSHOT FILE: '%s', '%s'\n",
snapshot[i].key.printable().c_str(),
snapshot[i + 1].key.printable().c_str());
}
ASSERT(snapshot[i].key < snapshot[i + 1].key);
}
int i = 0;
while (i < snapshot.size() && snapshot[i].key < keyRange.begin) {
if (snapshot.size() < 10) { // debug
printf(" Pruning %s < %s\n", snapshot[i].key.printable().c_str(), keyRange.begin.printable().c_str());
}
i++;
}
while (i < snapshot.size() && snapshot[i].key < keyRange.end) {
dataMap->insert({ snapshot[i].key, snapshot[i].value });
if (snapshot.size() < 10) { // debug
printf(" Including %s\n", snapshot[i].key.printable().c_str());
}
i++;
}
if (snapshot.size() < 10) { // debug
while (i < snapshot.size()) {
printf(" Pruning %s >= %s\n", snapshot[i].key.printable().c_str(), keyRange.end.printable().c_str());
i++;
}
}
printf("Started with %d rows from snapshot file %s after pruning to [%s - %s)\n",
dataMap->size(),
filename.c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
return arena;
} catch (Error& e) {
printf("Reading snapshot file %s got error %s\n", filename.c_str(), e.name());
throw e;
}
while (i < snapshot.size() && snapshot[i].key < keyRange.end) {
dataMap->insert({ snapshot[i].key, snapshot[i].value });
}
printf("Started with %d rows from snapshot file %s\n", dataMap->size(), filename.c_str());
return Void();
}
ACTOR Future<GranuleDeltas> readDeltaFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
Arena arena,
KeyRangeRef keyRange,
Version readVersion) {
state AsyncFileS3BlobStoreRead reader(bstore, bucket, filename);
state int64_t size = wait(reader.size());
state uint8_t* data = new (arena) uint8_t[size];
printf("Reading %d bytes from delta file %s\n", size, filename.c_str());
int readSize = wait(reader.read(data, size, 0));
printf("Read %d bytes from delta file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
ACTOR Future<Standalone<GranuleDeltas>> readDeltaFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
KeyRangeRef keyRange,
Version readVersion) {
try {
printf("Starting read of delta file %s\n", filename.c_str());
state Standalone<GranuleDeltas> result;
state Reference<AsyncFileS3BlobStoreRead> reader =
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, filename);
state int64_t size = wait(reader->size());
printf("Got delta file size %lld\n", size);
state uint8_t* data = new (result.arena()) uint8_t[size];
printf("Reading %lld bytes from delta file %s into %p\n", size, filename.c_str(), data);
int readSize = wait(reader->read(data, size, 0));
printf("Read %d bytes from delta file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
// Don't do range or version filtering in here since we'd have to copy/rewrite the deltas and it might starve
// snapshot read task, do it in main thread
// Don't do range or version filtering in here since we'd have to copy/rewrite the deltas and it might starve
// snapshot read task, do it in main thread
StringRef dataRef(data, size);
// weirdness with vector refs and arenas here
Arena parseArena;
StringRef dataRef(data, size);
ArenaObjectReader rdr(result.arena(), dataRef, Unversioned());
rdr.deserialize(FileIdentifierFor<GranuleDeltas>::value, result.contents(), parseArena);
result.arena().dependsOn(parseArena);
GranuleDeltas deltas = ObjectReader::fromStringRef<GranuleDeltas>(dataRef, Unversioned());
printf("Parsed %d deltas from delta file %s\n", deltas.size(), filename.c_str());
return deltas;
// result.contents() = ObjectReader::fromStringRef<GranuleDeltas>(dataRef, Unversioned());
printf("Parsed %d deltas from delta file %s\n", result.size(), filename.c_str());
// TODO REMOVE sanity check
for (int i = 0; i < result.size() - 1; i++) {
if (result[i].v > result[i + 1].v) {
printf("BG VERSION ORDER VIOLATION IN DELTA FILE: '%lld', '%lld'\n", result[i].v, result[i + 1].v);
}
ASSERT(result[i].v <= result[i + 1].v);
}
return result;
} catch (Error& e) {
printf("Reading delta file %s got error %s\n", filename.c_str(), e.name());
throw e;
}
}
// TODO unit test this!
// TODO this giant switch is mostly lifted from storage server.
// Could refactor atomics to have a generic "handle this atomic mutation" thing instead of having to duplicate code with
// the switch statement everywhere?
@ -192,38 +255,48 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
Version readVersion,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket) {
// arena to hold all stuff for parsing and updaing mutations. Most of it will likely be tossed, so we copy at the
// end instead of doing a dependsOn
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
// will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a
// dependsOn.
// FIXME: probably some threshold of a small percentage of the data is actually changed, where it makes sense to
// just to dependsOn instead of copy, to use a little extra memory footprint to help cpu?
state Arena arena;
state std::map<KeyRef, ValueRef> dataMap;
Future<Void> readSnapshotFuture =
readSnapshotFile(bstore, bucket, chunk.snapshotFileName.toString(), arena, keyRange, &dataMap);
state std::vector<Future<GranuleDeltas>> readDeltaFutures;
readDeltaFutures.resize(chunk.deltaFileNames.size());
for (StringRef deltaFileName : chunk.deltaFileNames) {
readDeltaFutures.push_back(
readDeltaFile(bstore, bucket, deltaFileName.toString(), arena, keyRange, readVersion));
}
try {
state std::map<KeyRef, ValueRef> dataMap;
Future<Arena> readSnapshotFuture =
readSnapshotFile(bstore, bucket, chunk.snapshotFileName.toString(), keyRange, &dataMap);
state std::vector<Future<Standalone<GranuleDeltas>>> readDeltaFutures;
readDeltaFutures.reserve(chunk.deltaFileNames.size());
for (StringRef deltaFileName : chunk.deltaFileNames) {
readDeltaFutures.push_back(readDeltaFile(bstore, bucket, deltaFileName.toString(), keyRange, readVersion));
}
wait(readSnapshotFuture);
for (Future<GranuleDeltas> deltaFuture : readDeltaFutures) {
GranuleDeltas result = wait(deltaFuture);
applyDeltas(&dataMap, arena, result, keyRange, readVersion);
Arena snapshotArena = wait(readSnapshotFuture);
arena.dependsOn(snapshotArena);
printf("Applying %d delta files\n", readDeltaFutures.size());
for (Future<Standalone<GranuleDeltas>> deltaFuture : readDeltaFutures) {
Standalone<GranuleDeltas> result = wait(deltaFuture);
arena.dependsOn(result.arena());
applyDeltas(&dataMap, arena, result, keyRange, readVersion);
wait(yield());
}
printf("Applying %d memory deltas\n", chunk.newDeltas.size());
applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion);
wait(yield());
}
printf("Applying %d memory deltas\n", chunk.newDeltas.size());
applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion);
wait(yield());
RangeResult ret;
for (auto& it : dataMap) {
ret.push_back_deep(ret.arena(), KeyValueRef(it.first, it.second));
// TODO for large reads, probably wait to yield periodically here for slowTask
}
printf("Final processing ended up with %d rows\n", ret.size());
RangeResult ret;
for (auto& it : dataMap) {
ret.push_back_deep(ret.arena(), KeyValueRef(it.first, it.second));
// TODO for large reads, probably wait to yield periodically here for slowTask
}
return ret;
return ret;
} catch (Error& e) {
printf("Reading blob granule got error %s\n", e.name());
throw e;
}
}
// TODO probably should add things like limit/bytelimit at some point?

View File

@ -33,7 +33,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
// Reads the fileset in the reply using the provided blob store, amnd filters data and mutations by key + version from
// Reads the fileset in the reply using the provided blob store, and filters data and mutations by key + version from
// the request
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
KeyRangeRef keyRange,

View File

@ -64,6 +64,7 @@ struct MutationAndVersion {
}
};
// TODO should name all things that don't have their own arena *Ref
// file format of actual blob files
struct GranuleSnapshot : VectorRef<KeyValueRef> {

View File

@ -747,7 +747,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Blob granlues
init( BG_URL, "" );
init( BG_BUCKET, "" );
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 );
// TODO CHANGE BACK
// init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 );
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 1000000 );
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );

View File

@ -0,0 +1,451 @@
/*
* BlobManager.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/SystemData.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbserver/BlobManagerInterface.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
Arena ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove,
KeyRef rangeStart,
KeyRef rangeEnd,
bool rangeActive) {
printf("db range [%s - %s): %s\n",
rangeStart.printable().c_str(),
rangeEnd.printable().c_str(),
rangeActive ? "T" : "F");
KeyRange keyRange(KeyRangeRef(rangeStart, rangeEnd));
/*auto ranges = knownBlobRanges->getAffectedRangesAfterInsertion(keyRange, rangeActive);
for (int i = 0; i < ranges.size(); i++) {
if (ranges[i].value != rangeActive || ranges[i].begin == rangeStart && ranges[i].end == rangeEnd) {
if (rangeActive) {
printf("BM Adding client range [%s - %s)\n",
ranges[i].begin.printable().c_str(),
ranges[i].end.printable().c_str());
rangesToAdd->push_back_deep(ar, KeyRangeRef(ranges[i].begin, ranges[i].end));
} else {
printf("BM Removing client range [%s - %s)\n",
ranges[i].begin.printable().c_str(),
ranges[i].end.printable().c_str());
rangesToRemove->push_back_deep(ar, KeyRangeRef(ranges[i].begin, ranges[i].end));
}
} else {
printf("BM range [%s - %s) has same value\n",
ranges[i].begin.printable().c_str(),
ranges[i].end.printable().c_str());
}
}
*/
auto allRanges = knownBlobRanges->intersectingRanges(keyRange);
for (auto& r : allRanges) {
if (r.value() != rangeActive) {
KeyRef overlapStart = (r.begin() > keyRange.begin) ? r.begin() : keyRange.begin;
KeyRef overlapEnd = (keyRange.end < r.end()) ? keyRange.end : r.end();
KeyRangeRef overlap(overlapStart, overlapEnd);
if (rangeActive) {
printf("BM Adding client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
rangesToAdd->push_back_deep(ar, overlap);
} else {
printf("BM Removing client range [%s - %s)\n",
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
rangesToRemove->push_back_deep(ar, overlap);
}
}
// results.emplace_back(r.range(), r.value());
// printf(" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" :
// "F");
}
knownBlobRanges->insert(keyRange, rangeActive);
}
void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
RangeResult dbBlobRanges,
Arena ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove) {
printf("Updating %d client blob ranges", dbBlobRanges.size() / 2);
for (int i = 0; i < dbBlobRanges.size() - 1; i += 2) {
printf(" [%s - %s)", dbBlobRanges[i].key.printable().c_str(), dbBlobRanges[i + 1].key.printable().c_str());
}
printf("\n");
// essentially do merge diff of current known blob ranges and new ranges, to assign new ranges to
// workers and revoke old ranges from workers
// basically, for any range that is set in results that isn't set in ranges, assign the range to the
// worker. for any range that isn't set in results that is set in ranges, revoke the range from the
// worker. and, update ranges to match results as you go
// FIXME: could change this to O(N) instead of O(NLogN) by doing a sorted merge instead of requesting the
// intersection for each insert, but this operation is pretty infrequent so it's probably not necessary
if (dbBlobRanges.size() == 0) {
// special case. Nothing in the DB, reset knownBlobRanges and revoke all existing ranges from workers
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, normalKeys.end, false);
} else {
if (dbBlobRanges[0].key > normalKeys.begin) {
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, dbBlobRanges[0].key, false);
}
for (int i = 0; i < dbBlobRanges.size() - 1; i++) {
if (dbBlobRanges[i].key >= normalKeys.end) {
printf("Found invalid blob range start %s\n", dbBlobRanges[i].key.printable().c_str());
break;
}
bool active = dbBlobRanges[i].value == LiteralStringRef("1");
if (active) {
ASSERT(dbBlobRanges[i + 1].value == StringRef());
printf("BM sees client range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
dbBlobRanges[i + 1].key.printable().c_str());
}
KeyRef endKey = dbBlobRanges[i + 1].key;
if (endKey > normalKeys.end) {
printf("Removing system keyspace from blob range [%s - %s)\n",
dbBlobRanges[i].key.printable().c_str(),
endKey.printable().c_str());
endKey = normalKeys.end;
}
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, dbBlobRanges[i].key, endKey, active);
}
if (dbBlobRanges[dbBlobRanges.size() - 1].key < normalKeys.end) {
handleClientBlobRange(knownBlobRanges,
ar,
rangesToAdd,
rangesToRemove,
dbBlobRanges[dbBlobRanges.size() - 1].key,
normalKeys.end,
false);
}
}
knownBlobRanges->coalesce(normalKeys);
}
void getRanges(std::vector<std::pair<KeyRangeRef, bool>>& results, KeyRangeMap<bool>& knownBlobRanges) {
printf("Getting ranges:\n");
auto allRanges = knownBlobRanges.ranges();
for (auto& r : allRanges) {
results.emplace_back(r.range(), r.value());
printf(" [%s - %s): %s\n", r.begin().printable().c_str(), r.end().printable().c_str(), r.value() ? "T" : "F");
}
}
// Test:
// start empty
// DB has [A - B). That should show up in knownBlobRanges and should be in added
// DB has nothing. knownBlobRanges should be empty and [A - B) should be in removed
// DB has [A - B) and [C - D). They should both show up in knownBlobRanges and added.
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [B - C) should be in added.
// DB has [A - C). It should show up coalesced in knownBlobRanges, and [C - D) should be in removed.
// DB has [B - C). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
// DB has [B - D). It should show up coalesced in knownBlobRanges, and [C - D) should be removed.
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) should be
// in removed.
TEST_CASE("/blobmanager/updateranges") {
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
Arena ar;
VectorRef<KeyRangeRef> added;
VectorRef<KeyRangeRef> removed;
StringRef active = LiteralStringRef("1");
StringRef inactive = StringRef();
RangeResult dbDataEmpty;
vector<std::pair<KeyRangeRef, bool>> kbrRanges;
StringRef keyA = StringRef(ar, LiteralStringRef("A"));
StringRef keyB = StringRef(ar, LiteralStringRef("B"));
StringRef keyC = StringRef(ar, LiteralStringRef("C"));
StringRef keyD = StringRef(ar, LiteralStringRef("D"));
// db data setup
RangeResult dbDataAB;
dbDataAB.emplace_back(ar, keyA, active);
dbDataAB.emplace_back(ar, keyB, inactive);
RangeResult dbDataAC;
dbDataAC.emplace_back(ar, keyA, active);
dbDataAC.emplace_back(ar, keyC, inactive);
RangeResult dbDataAD;
dbDataAD.emplace_back(ar, keyA, active);
dbDataAD.emplace_back(ar, keyD, inactive);
RangeResult dbDataBC;
dbDataBC.emplace_back(ar, keyB, active);
dbDataBC.emplace_back(ar, keyC, inactive);
RangeResult dbDataBD;
dbDataBD.emplace_back(ar, keyB, active);
dbDataBD.emplace_back(ar, keyD, inactive);
RangeResult dbDataCD;
dbDataCD.emplace_back(ar, keyC, active);
dbDataCD.emplace_back(ar, keyD, inactive);
RangeResult dbDataAB_CD;
dbDataAB_CD.emplace_back(ar, keyA, active);
dbDataAB_CD.emplace_back(ar, keyB, inactive);
dbDataAB_CD.emplace_back(ar, keyC, active);
dbDataAB_CD.emplace_back(ar, keyD, inactive);
// key ranges setup
KeyRangeRef rangeAB = KeyRangeRef(keyA, keyB);
KeyRangeRef rangeAC = KeyRangeRef(keyA, keyC);
KeyRangeRef rangeAD = KeyRangeRef(keyA, keyD);
KeyRangeRef rangeBC = KeyRangeRef(keyB, keyC);
KeyRangeRef rangeBD = KeyRangeRef(keyB, keyD);
KeyRangeRef rangeCD = KeyRangeRef(keyC, keyD);
KeyRangeRef rangeStartToA = KeyRangeRef(normalKeys.begin, keyA);
KeyRangeRef rangeStartToB = KeyRangeRef(normalKeys.begin, keyB);
KeyRangeRef rangeStartToC = KeyRangeRef(normalKeys.begin, keyC);
KeyRangeRef rangeBToEnd = KeyRangeRef(keyB, normalKeys.end);
KeyRangeRef rangeCToEnd = KeyRangeRef(keyC, normalKeys.end);
KeyRangeRef rangeDToEnd = KeyRangeRef(keyD, normalKeys.end);
// actual test
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 1);
ASSERT(kbrRanges[0].first == normalKeys);
ASSERT(!kbrRanges[0].second);
// DB has [A - B)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBToEnd);
ASSERT(!kbrRanges[2].second);
// DB has nothing
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataEmpty, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeAB);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges[0].first == normalKeys);
ASSERT(!kbrRanges[0].second);
// DB has [A - B) and [C - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 2);
ASSERT(added[0] == rangeAB);
ASSERT(added[1] == rangeCD);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 5);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBC);
ASSERT(!kbrRanges[2].second);
ASSERT(kbrRanges[3].first == rangeCD);
ASSERT(kbrRanges[3].second);
ASSERT(kbrRanges[4].first == rangeDToEnd);
ASSERT(!kbrRanges[4].second);
// DB has [A - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAC, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeCD);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [B - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeAB);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [B - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeCD);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
ASSERT(removed.size() == 0);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAD);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeDToEnd);
ASSERT(!kbrRanges[2].second);
// DB has [A - B) and [C - D)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 0);
ASSERT(removed.size() == 1);
ASSERT(removed[0] == rangeBC);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 5);
ASSERT(kbrRanges[0].first == rangeStartToA);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeAB);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeBC);
ASSERT(!kbrRanges[2].second);
ASSERT(kbrRanges[3].first == rangeCD);
ASSERT(kbrRanges[3].second);
ASSERT(kbrRanges[4].first == rangeDToEnd);
ASSERT(!kbrRanges[4].second);
// DB has [B - C)
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);
ASSERT(removed.size() == 2);
ASSERT(removed[0] == rangeAB);
ASSERT(removed[1] == rangeCD);
getRanges(kbrRanges, knownBlobRanges);
ASSERT(kbrRanges.size() == 3);
ASSERT(kbrRanges[0].first == rangeStartToB);
ASSERT(!kbrRanges[0].second);
ASSERT(kbrRanges[1].first == rangeBC);
ASSERT(kbrRanges[1].second);
ASSERT(kbrRanges[2].first == rangeCToEnd);
ASSERT(!kbrRanges[2].second);
return Void();
}

View File

@ -0,0 +1,32 @@
/*
* BlobWorkerInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_BLOBMANAGERINTERFACE_H
#define FDBSERVER_BLOBMANAGERINTERFACE_H
#pragma once
// TODO add actual interface, remove functionality stuff hack for ratekeeper
void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
RangeResult dbBlobRanges,
Arena a,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove);
#endif

View File

@ -5,6 +5,8 @@ set(FDBSERVER_SRCS
BackupProgress.actor.cpp
BackupProgress.actor.h
BackupWorker.actor.cpp
BlobManager.actor.cpp
BlobManagerInterface.h
ClusterController.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h

View File

@ -3094,6 +3094,9 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
state std::string bucket = SERVER_KNOBS->BG_BUCKET;
state Reference<S3BlobStoreEndpoint> bstore;
// TODO CHANGE BACK
wait(delay(10.0));
printf("Initializing CC s3 stuff\n");
try {
printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str());
@ -3171,6 +3174,9 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
req.readVersion = v;
ErrorOr<BlobGranuleFileReply> _rep =
wait(workerInterfaceCache[workerId].blobGranuleFileRequest.tryGetReply(req));
if (_rep.isError()) {
throw _rep.getError();
}
BlobGranuleFileReply rep = _rep.get();
printf("Blob granule request for [%s - %s) @ %lld got reply from %s:\n",
granuleStartKey.printable().c_str(),

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,7 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobmanager/

View File

@ -44,6 +44,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE)
add_fdb_test(TEST_FILES BigInsert.txt IGNORE)
add_fdb_test(TEST_FILES BlobGranuleReaderUnit.txt) # TODO re-ignore!
add_fdb_test(TEST_FILES BlobManagerUnit.txt) # TODO re-ignore!
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)