Add StorageServerInterface for BlobMigrator
This commit is contained in:
parent
78e2871a79
commit
f2289ced27
|
@ -142,7 +142,6 @@ bool isRangeFullyCovered(KeyRange range, Standalone<VectorRef<BlobGranuleChunkRe
|
|||
for (const BlobGranuleChunkRef& chunk : blobChunks) {
|
||||
blobRanges.push_back(chunk.keyRange);
|
||||
}
|
||||
|
||||
return range.isCovered(blobRanges);
|
||||
}
|
||||
|
||||
|
@ -194,7 +193,7 @@ TEST_CASE("/fdbserver/blobgranule/isRangeCoveredByBlob") {
|
|||
testAddChunkRange("key_a1"_sr, "key_a9"_sr, continuedChunks);
|
||||
testAddChunkRange("key_a9"_sr, "key_b1"_sr, continuedChunks);
|
||||
testAddChunkRange("key_b1"_sr, "key_b9"_sr, continuedChunks);
|
||||
ASSERT(isRangeFullyCovered(KeyRangeRef("key_a1"_sr, "key_b9"_sr), continuedChunks) == false);
|
||||
ASSERT(isRangeFullyCovered(KeyRangeRef("key_a1"_sr, "key_b9"_sr), continuedChunks));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -336,12 +336,13 @@ struct KeyRangeRef {
|
|||
bool isCovered(std::vector<KeyRangeRef>& ranges) {
|
||||
ASSERT(std::is_sorted(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder()));
|
||||
KeyRangeRef clone(begin, end);
|
||||
|
||||
for (auto r : ranges) {
|
||||
if (begin < r.begin)
|
||||
if (clone.begin < r.begin)
|
||||
return false; // uncovered gap between clone.begin and r.begin
|
||||
if (end <= r.end)
|
||||
if (clone.end <= r.end)
|
||||
return true; // range is fully covered
|
||||
if (end > r.begin)
|
||||
if (clone.end > r.begin)
|
||||
// {clone.begin, r.end} is covered. need to check coverage for {r.end, clone.end}
|
||||
clone = KeyRangeRef(r.end, clone.end);
|
||||
}
|
||||
|
|
|
@ -3537,7 +3537,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
}
|
||||
|
||||
// skip the rest of the algorithm for the first blob manager
|
||||
if (bmData->epoch == 1) {
|
||||
if (bmData->epoch == 1 && !isFullRestoreMode()) {
|
||||
bmData->doneRecovering.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/BlobConnectionProvider.h"
|
||||
|
@ -189,23 +190,6 @@ private:
|
|||
static const int sMaxCount_{ 5 }; // max number of manifest file to keep
|
||||
};
|
||||
|
||||
// Defines granule info that interests full restore
|
||||
struct BlobGranuleVersion {
|
||||
// Two constructors required by VectorRef
|
||||
BlobGranuleVersion() {}
|
||||
BlobGranuleVersion(Arena& a, const BlobGranuleVersion& copyFrom)
|
||||
: granuleID(copyFrom.granuleID), keyRange(a, copyFrom.keyRange), version(copyFrom.version),
|
||||
sizeInBytes(copyFrom.sizeInBytes) {}
|
||||
|
||||
UID granuleID;
|
||||
KeyRangeRef keyRange;
|
||||
Version version;
|
||||
int64_t sizeInBytes;
|
||||
};
|
||||
|
||||
// Defines a vector for BlobGranuleVersion
|
||||
typedef Standalone<VectorRef<BlobGranuleVersion>> BlobGranuleVersionVector;
|
||||
|
||||
// Defines filename, version, size for each granule file that interests full restore
|
||||
struct GranuleFileVersion {
|
||||
Version version;
|
||||
|
@ -226,16 +210,53 @@ public:
|
|||
Value data = wait(readFromFile(self));
|
||||
Standalone<BlobManifest> manifest = decode(data);
|
||||
wait(writeSystemKeys(self, manifest.rows));
|
||||
BlobGranuleVersionVector _ = wait(listGranules(self));
|
||||
BlobGranuleRestoreVersionVector _ = wait(listGranules(self));
|
||||
} catch (Error& e) {
|
||||
dprint("WARNING: unexpected manifest loader error {}\n", e.what()); // skip error handling so far
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Iterate active granules and return their version/sizes
|
||||
ACTOR static Future<BlobGranuleRestoreVersionVector> listGranules(Reference<BlobManifestLoader> self) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
state BlobGranuleRestoreVersionVector results;
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
try {
|
||||
std::vector<KeyRangeRef> granules;
|
||||
state int i = 0;
|
||||
auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED;
|
||||
state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit));
|
||||
for (i = 0; i < blobRanges.size() - 1; i++) {
|
||||
Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
state KeyRange granuleRange = KeyRangeRef(startKey, endKey);
|
||||
try {
|
||||
Standalone<BlobGranuleRestoreVersion> granule = wait(getGranule(&tr, granuleRange));
|
||||
results.push_back_deep(results.arena(), granule);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_restore_missing_data) {
|
||||
dprint("missing data for key range {} \n", granuleRange.toString());
|
||||
TraceEvent("BlobRestoreMissingData").detail("KeyRange", granuleRange.toString());
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Print out a summary for blob granules
|
||||
ACTOR static Future<Void> print(Reference<BlobManifestLoader> self) {
|
||||
state BlobGranuleVersionVector granules = wait(listGranules(self));
|
||||
state BlobGranuleRestoreVersionVector granules = wait(listGranules(self));
|
||||
for (auto granule : granules) {
|
||||
wait(checkGranuleFiles(self, granule));
|
||||
}
|
||||
|
@ -285,41 +306,9 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Iterate active granules and return their version/sizes
|
||||
ACTOR static Future<BlobGranuleVersionVector> listGranules(Reference<BlobManifestLoader> self) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
state BlobGranuleVersionVector results;
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
try {
|
||||
std::vector<KeyRangeRef> granules;
|
||||
state int i = 0;
|
||||
auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED;
|
||||
state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit));
|
||||
for (i = 0; i < blobRanges.size() - 1; i++) {
|
||||
Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
state KeyRange granuleRange = KeyRangeRef(startKey, endKey);
|
||||
try {
|
||||
Standalone<BlobGranuleVersion> granule = wait(getGranule(&tr, granuleRange));
|
||||
results.push_back_deep(results.arena(), granule);
|
||||
} catch (Error& e) {
|
||||
dprint("missing data for key range {} \n", granuleRange.toString());
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Find the newest granule for a key range. The newest granule has the max version and relevant files
|
||||
ACTOR static Future<Standalone<BlobGranuleVersion>> getGranule(Transaction* tr, KeyRangeRef range) {
|
||||
state Standalone<BlobGranuleVersion> granuleVersion;
|
||||
ACTOR static Future<Standalone<BlobGranuleRestoreVersion>> getGranule(Transaction* tr, KeyRangeRef range) {
|
||||
state Standalone<BlobGranuleRestoreVersion> granuleVersion;
|
||||
KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
// reverse lookup so that the first row is the newest version
|
||||
state RangeResult results =
|
||||
|
@ -389,7 +378,7 @@ private:
|
|||
}
|
||||
|
||||
// Read data from granules and print out summary
|
||||
ACTOR static Future<Void> checkGranuleFiles(Reference<BlobManifestLoader> self, BlobGranuleVersion granule) {
|
||||
ACTOR static Future<Void> checkGranuleFiles(Reference<BlobManifestLoader> self, BlobGranuleRestoreVersion granule) {
|
||||
state KeyRangeRef range = granule.keyRange;
|
||||
state Version readVersion = granule.version;
|
||||
state Transaction tr(self->db_);
|
||||
|
@ -441,3 +430,11 @@ ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProv
|
|||
wait(BlobManifestLoader::print(loader));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// API to list blob granules
|
||||
ACTOR Future<BlobGranuleRestoreVersionVector> listBlobGranules(Database db,
|
||||
Reference<BlobConnectionProvider> blobConn) {
|
||||
Reference<BlobManifestLoader> loader = makeReference<BlobManifestLoader>(db, blobConn);
|
||||
BlobGranuleRestoreVersionVector result = wait(BlobManifestLoader::listGranules(loader));
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -30,54 +30,312 @@
|
|||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.actor.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include "flow/network.h"
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
|
||||
#define ENABLE_DEBUG_MG true
|
||||
|
||||
template <typename... T>
|
||||
static inline void dprint(fmt::format_string<T...> fmt, T&&... args) {
|
||||
if (ENABLE_DEBUG_MG)
|
||||
fmt::print(fmt, std::forward<T>(args)...);
|
||||
}
|
||||
|
||||
// BlobMigrator manages data migration from blob storage to storage server. It implements a minimal set of
|
||||
// StorageServerInterface APIs which are needed for DataDistributor to start data migration.
|
||||
class BlobMigrator : public NonCopyable, public ReferenceCounted<BlobMigrator> {
|
||||
public:
|
||||
BlobMigrator(Reference<AsyncVar<ServerDBInfo> const> dbInfo, BlobMigratorInterface interf)
|
||||
: blobMigratorInterf(interf), actors(false) {
|
||||
if (!blobConn.isValid() && SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
|
||||
blobConn = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
|
||||
: interf_(interf), actors_(false) {
|
||||
if (!blobConn_.isValid() && SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
|
||||
blobConn_ = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
|
||||
}
|
||||
db = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
db_ = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
}
|
||||
~BlobMigrator() {}
|
||||
|
||||
// Start migration
|
||||
ACTOR static Future<Void> start(Reference<BlobMigrator> self) {
|
||||
self->actors.add(waitFailureServer(self->blobMigratorInterf.waitFailure.getFuture()));
|
||||
if (!isFullRestoreMode()) {
|
||||
return Void();
|
||||
}
|
||||
wait(delay(10)); // TODO need to wait for a signal for readiness of blob manager
|
||||
|
||||
BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_));
|
||||
self->blobGranules_ = granules;
|
||||
|
||||
wait(prepare(self, normalKeys));
|
||||
|
||||
wait(serverLoop(self));
|
||||
return Void();
|
||||
}
|
||||
|
||||
private:
|
||||
// Prepare for data migration for given key range.
|
||||
ACTOR static Future<Void> prepare(Reference<BlobMigrator> self, KeyRangeRef keys) {
|
||||
// Register as a storage server, so that DataDistributor could start data movement after
|
||||
std::pair<Version, Tag> verAndTag = wait(addStorageServer(self->db_, self->interf_.ssi));
|
||||
dprint("Started storage server interface {} {}\n", verAndTag.first, verAndTag.second.toString());
|
||||
|
||||
// Reassign key ranges to the storage server
|
||||
// It'll restart DataDistributor so that internal data structures like ShardTracker, ShardsAffectedByTeamFailure
|
||||
// could be re-initialized. Ideally it should be done within DataDistributor, then we don't need to
|
||||
// restart DataDistributor
|
||||
state int oldMode = wait(setDDMode(self->db_, 0));
|
||||
wait(unassignServerKeys(self, keys));
|
||||
wait(assignKeysToServer(self, keys, self->interf_.ssi.id()));
|
||||
wait(success(setDDMode(self->db_, oldMode)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Assign given key range to specified storage server. Subsquent
|
||||
ACTOR static Future<Void> assignKeysToServer(Reference<BlobMigrator> self, KeyRangeRef keys, UID serverUID) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
choose {
|
||||
when(HaltBlobMigratorRequest req = waitNext(self->blobMigratorInterf.haltBlobMigrator.getFuture())) {
|
||||
req.reply.send(Void());
|
||||
TraceEvent("BlobMigratorHalted", self->blobMigratorInterf.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state Value value = keyServersValue(std::vector<UID>({ serverUID }), std::vector<UID>(), UID(), UID());
|
||||
wait(krmSetRange(&tr, keyServersPrefix, keys, value));
|
||||
wait(krmSetRange(&tr, serverKeysPrefixFor(serverUID), keys, serverKeysTrue));
|
||||
wait(tr.commit());
|
||||
dprint("Assign {} to server {}\n", normalKeys.toString(), serverUID.toString());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Unassign given key range from its current storage servers
|
||||
ACTOR static Future<Void> unassignServerKeys(Reference<BlobMigrator> self, KeyRangeRef keys) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
for (auto& server : serverList) {
|
||||
state UID id = decodeServerListValue(server.value).id();
|
||||
RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(id), keys));
|
||||
bool owning = false;
|
||||
for (auto& r : ranges) {
|
||||
if (r.value == serverKeysTrue) {
|
||||
owning = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (owning) {
|
||||
dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString());
|
||||
wait(krmSetRange(&tr, serverKeysPrefixFor(id), keys, serverKeysFalse));
|
||||
}
|
||||
}
|
||||
when(wait(self->actors.getResult())) {}
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main server loop
|
||||
ACTOR static Future<Void> serverLoop(Reference<BlobMigrator> self) {
|
||||
self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture()));
|
||||
self->actors_.add(handleRequest(self));
|
||||
self->actors_.add(handleUnsupportedRequest(self));
|
||||
loop {
|
||||
try {
|
||||
choose {
|
||||
when(HaltBlobMigratorRequest req = waitNext(self->interf_.haltBlobMigrator.getFuture())) {
|
||||
req.reply.send(Void());
|
||||
TraceEvent("BlobMigratorHalted", self->interf_.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when(wait(self->actors_.getResult())) {}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
dprint("Unexpected serverLoop error {}\n", e.what());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Handle StorageServerInterface APIs
|
||||
ACTOR static Future<Void> handleRequest(Reference<BlobMigrator> self) {
|
||||
state StorageServerInterface ssi = self->interf_.ssi;
|
||||
loop {
|
||||
try {
|
||||
choose {
|
||||
when(GetShardStateRequest req = waitNext(ssi.getShardState.getFuture())) {
|
||||
dprint("Handle GetShardStateRequest\n");
|
||||
Version version = maxVersion(self);
|
||||
GetShardStateReply rep(version, version);
|
||||
req.reply.send(rep); // return empty shards
|
||||
}
|
||||
when(WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
|
||||
// dprint("Handle WaitMetricsRequest\n");
|
||||
self->actors_.add(processWaitMetricsRequest(self, req));
|
||||
}
|
||||
when(SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
|
||||
dprint("Handle SplitMetrics {}\n", req.keys.toString());
|
||||
SplitMetricsReply rep;
|
||||
for (auto granule : self->blobGranules_) {
|
||||
// TODO: Use granule boundary as split point. A better approach is to split by size
|
||||
if (granule.keyRange.begin > req.keys.begin && granule.keyRange.end < req.keys.end)
|
||||
rep.splits.push_back_deep(rep.splits.arena(), granule.keyRange.begin);
|
||||
}
|
||||
req.reply.send(rep);
|
||||
}
|
||||
when(GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
|
||||
fmt::print("Handle GetStorageMetrics\n");
|
||||
StorageMetrics metrics;
|
||||
metrics.bytes = sizeInBytes(self);
|
||||
GetStorageMetricsReply resp;
|
||||
resp.load = metrics;
|
||||
req.reply.send(resp);
|
||||
}
|
||||
when(ReplyPromise<KeyValueStoreType> reply = waitNext(ssi.getKeyValueStoreType.getFuture())) {
|
||||
dprint("Handle KeyValueStoreType\n");
|
||||
reply.send(KeyValueStoreType::MEMORY);
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
dprint("Unexpected blob migrator request error {}\n", e.what());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle StorageServerInterface APIs that are not supported. Simply log and return error
|
||||
ACTOR static Future<Void> handleUnsupportedRequest(Reference<BlobMigrator> self) {
|
||||
state StorageServerInterface ssi = self->interf_.ssi;
|
||||
loop {
|
||||
try {
|
||||
choose {
|
||||
when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) {
|
||||
dprint("Unsupported SplitRangeRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
|
||||
self->actors_.add(processStorageQueuingMetricsRequest(req));
|
||||
}
|
||||
when(ReadHotSubRangeRequest req = waitNext(ssi.getReadHotRanges.getFuture())) {
|
||||
dprint("Unsupported ReadHotSubRange\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetKeyValuesStreamRequest req = waitNext(ssi.getKeyValuesStream.getFuture())) {
|
||||
dprint("Unsupported GetKeyValuesStreamRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
|
||||
dprint("Unsupported GetKeyRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture())) {
|
||||
/* dprint("Unsupported GetKeyValuesRequest {} - {} @ {}\n",
|
||||
req.begin.getKey().printable(),
|
||||
req.end.getKey().printable(),
|
||||
req.version); */
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetValueRequest req = waitNext(ssi.getValue.getFuture())) {
|
||||
dprint("Unsupported GetValueRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetCheckpointRequest req = waitNext(ssi.checkpoint.getFuture())) {
|
||||
dprint("Unsupported GetCheckpoint \n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) {
|
||||
dprint("Unsupported FetchCheckpointRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) {
|
||||
dprint("Unsupported UpdateCommitCostRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
|
||||
dprint("Unsupported FetchCheckpointKeyValuesRequest\n");
|
||||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
dprint("Unexpected request handling error {}\n", e.what());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> processWaitMetricsRequest(Reference<BlobMigrator> self, WaitMetricsRequest req) {
|
||||
state WaitMetricsRequest waitMetricsRequest = req;
|
||||
// FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD
|
||||
// processes
|
||||
wait(delay(1));
|
||||
StorageMetrics metrics;
|
||||
metrics.bytes = sizeInBytes(self, waitMetricsRequest.keys);
|
||||
waitMetricsRequest.reply.send(metrics);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> processStorageQueuingMetricsRequest(StorageQueuingMetricsRequest req) {
|
||||
dprint("Unsupported StorageQueuingMetricsRequest\n");
|
||||
// FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD
|
||||
// processes
|
||||
wait(delay(1));
|
||||
req.reply.sendError(unsupported_operation());
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Return total storage size in bytes for migration
|
||||
static int64_t sizeInBytes(Reference<BlobMigrator> self) { return sizeInBytes(self, normalKeys); }
|
||||
|
||||
// Return storage size in bytes for given key range
|
||||
static int64_t sizeInBytes(Reference<BlobMigrator> self, KeyRangeRef range) {
|
||||
int64_t bytes = 0;
|
||||
for (auto granule : self->blobGranules_) {
|
||||
if (range.intersects(granule.keyRange))
|
||||
bytes += granule.sizeInBytes;
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
// Return max version for all blob granules
|
||||
static Version maxVersion(Reference<BlobMigrator> self) {
|
||||
Version max = 0;
|
||||
for (auto granule : self->blobGranules_) {
|
||||
max = std::max(granule.version, max);
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
private:
|
||||
Database db;
|
||||
Reference<BlobConnectionProvider> blobConn;
|
||||
BlobMigratorInterface blobMigratorInterf;
|
||||
ActorCollection actors;
|
||||
Database db_;
|
||||
Reference<BlobConnectionProvider> blobConn_;
|
||||
BlobGranuleRestoreVersionVector blobGranules_;
|
||||
BlobMigratorInterface interf_;
|
||||
ActorCollection actors_;
|
||||
};
|
||||
|
||||
// Main entry point
|
||||
ACTOR Future<Void> blobMigrator(BlobMigratorInterface ssi, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
fmt::print("Start blob migrator {} \n", ssi.id().toString());
|
||||
ACTOR Future<Void> blobMigrator(BlobMigratorInterface interf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
fmt::print("Start blob migrator {} \n", interf.id().toString());
|
||||
try {
|
||||
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, ssi);
|
||||
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, interf);
|
||||
wait(BlobMigrator::start(self));
|
||||
} catch (Error& e) {
|
||||
fmt::print("unexpected blob migrator error {}\n", e.what());
|
||||
dprint("Unexpected blob migrator error {}\n", e.what());
|
||||
TraceEvent("BlobMigratorError", interf.id()).error(e);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -3961,7 +3961,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
}
|
||||
}
|
||||
|
||||
if (createChangeFeed) {
|
||||
if (createChangeFeed && !isFullRestoreMode()) {
|
||||
// create new change feed for new version of granule
|
||||
wait(updateChangeFeed(
|
||||
&tr, granuleIDToCFKey(info.granuleID), ChangeFeedStatus::CHANGE_FEED_CREATE, req.keyRange));
|
||||
|
|
|
@ -2615,8 +2615,9 @@ ACTOR Future<Void> monitorBlobMigrator(ClusterControllerData* self) {
|
|||
}
|
||||
loop {
|
||||
if (self->db.serverInfo->get().blobMigrator.present() && !self->recruitBlobMigrator.get()) {
|
||||
state Future<Void> wfClient = waitFailureClient(self->db.serverInfo->get().blobMigrator.get().waitFailure,
|
||||
SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME);
|
||||
state Future<Void> wfClient =
|
||||
waitFailureClient(self->db.serverInfo->get().blobMigrator.get().ssi.waitFailure,
|
||||
SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME);
|
||||
loop {
|
||||
choose {
|
||||
when(wait(wfClient)) {
|
||||
|
|
|
@ -140,9 +140,27 @@ private:
|
|||
Future<Void> collection;
|
||||
};
|
||||
|
||||
// Defines granule info that interests full restore
|
||||
struct BlobGranuleRestoreVersion {
|
||||
// Two constructors required by VectorRef
|
||||
BlobGranuleRestoreVersion() {}
|
||||
BlobGranuleRestoreVersion(Arena& a, const BlobGranuleRestoreVersion& copyFrom)
|
||||
: granuleID(copyFrom.granuleID), keyRange(a, copyFrom.keyRange), version(copyFrom.version),
|
||||
sizeInBytes(copyFrom.sizeInBytes) {}
|
||||
|
||||
UID granuleID;
|
||||
KeyRangeRef keyRange;
|
||||
Version version;
|
||||
int64_t sizeInBytes;
|
||||
};
|
||||
|
||||
// Defines a vector for BlobGranuleVersion
|
||||
typedef Standalone<VectorRef<BlobGranuleRestoreVersion>> BlobGranuleRestoreVersionVector;
|
||||
|
||||
ACTOR Future<Void> dumpManifest(Database db, Reference<BlobConnectionProvider> blobConn, int64_t epoch, int64_t seqNo);
|
||||
ACTOR Future<Void> loadManifest(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<BlobGranuleRestoreVersionVector> listBlobGranules(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
inline bool isFullRestoreMode() {
|
||||
return SERVER_KNOBS->BLOB_FULL_RESTORE_MODE;
|
||||
};
|
||||
|
|
|
@ -30,23 +30,25 @@
|
|||
struct BlobMigratorInterface {
|
||||
constexpr static FileIdentifier file_identifier = 869199;
|
||||
RequestStream<struct HaltBlobMigratorRequest> haltBlobMigrator;
|
||||
RequestStream<ReplyPromise<Void>> waitFailure;
|
||||
LocalityData locality;
|
||||
UID uniqueID;
|
||||
StorageServerInterface ssi;
|
||||
|
||||
BlobMigratorInterface() {}
|
||||
BlobMigratorInterface(const struct LocalityData& l, UID id) : uniqueID(id), locality(l) {}
|
||||
BlobMigratorInterface(const struct LocalityData& l, UID id) : uniqueID(id), locality(l) {
|
||||
ssi.locality = l;
|
||||
ssi.uniqueID = id;
|
||||
}
|
||||
|
||||
void initEndpoints() {}
|
||||
void initEndpoints() { ssi.initEndpoints(); }
|
||||
UID id() const { return uniqueID; }
|
||||
NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); }
|
||||
NetworkAddress address() const { return haltBlobMigrator.getEndpoint().getPrimaryAddress(); }
|
||||
bool operator==(const BlobMigratorInterface& r) const { return id() == r.id(); }
|
||||
bool operator!=(const BlobMigratorInterface& r) const { return !(*this == r); }
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
// StorageServerInterface::serialize(ar);
|
||||
serializer(ar, waitFailure, haltBlobMigrator, locality, uniqueID);
|
||||
serializer(ar, locality, uniqueID, haltBlobMigrator);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@
|
|||
#include "fdbserver/TransactionTagCounter.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -5976,27 +5977,26 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
|||
Reference<BlobConnectionProvider> blobConn) {
|
||||
ASSERT(blobConn.isValid());
|
||||
try {
|
||||
|
||||
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks = wait(tryReadBlobGranules(tr, keys, fetchVersion));
|
||||
|
||||
if (chunks.size() == 0) {
|
||||
throw blob_granule_transaction_too_old(); // no data on blob
|
||||
}
|
||||
|
||||
if (!isRangeFullyCovered(keys, chunks)) {
|
||||
throw blob_granule_transaction_too_old();
|
||||
}
|
||||
|
||||
for (const BlobGranuleChunkRef& chunk : chunks) {
|
||||
state KeyRangeRef chunkRange = chunk.keyRange;
|
||||
state RangeResult rows = wait(readBlobGranule(chunk, keys, 0, fetchVersion, blobConn));
|
||||
state int i;
|
||||
for (i = 0; i < chunks.size(); ++i) {
|
||||
state KeyRangeRef chunkRange = chunks[i].keyRange;
|
||||
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
|
||||
TraceEvent("ReadBlobData")
|
||||
.detail("Rows", rows.size())
|
||||
.detail("ChunkRange", chunkRange.toString())
|
||||
.detail("Keys", keys.toString());
|
||||
|
||||
if (rows.size() == 0) {
|
||||
rows.readThrough = KeyRef(rows.arena(), chunkRange.end);
|
||||
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
|
||||
}
|
||||
if (i == chunks.size() - 1) {
|
||||
rows.readThrough = KeyRef(rows.arena(), keys.end);
|
||||
}
|
||||
results.send(rows);
|
||||
}
|
||||
|
@ -6010,7 +6010,7 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
|||
tr->reset();
|
||||
tr->setVersion(fetchVersion);
|
||||
tr->trState->taskID = TaskPriority::FetchKeys;
|
||||
wait(tryGetRange(results, tr, keys)); // fail back to storage server
|
||||
throw;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -6798,8 +6798,10 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
// We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure
|
||||
// change feed mutations get applied correctly
|
||||
state std::vector<Key> changeFeedsToFetch;
|
||||
std::vector<Key> _cfToFetch = wait(fetchCFMetadata);
|
||||
changeFeedsToFetch = _cfToFetch;
|
||||
if (!isFullRestoreMode()) {
|
||||
std::vector<Key> _cfToFetch = wait(fetchCFMetadata);
|
||||
changeFeedsToFetch = _cfToFetch;
|
||||
}
|
||||
wait(data->durableVersionLock.take());
|
||||
|
||||
shard->phase = AddingShard::Fetching;
|
||||
|
|
|
@ -2267,7 +2267,25 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
CODE_PROBE(true, "Recruited while already a blob migrator.");
|
||||
} else {
|
||||
startRole(Role::BLOB_MIGRATOR, recruited.id(), interf.id());
|
||||
DUMPTOKEN(recruited.waitFailure);
|
||||
DUMPTOKEN(recruited.haltBlobMigrator);
|
||||
DUMPTOKEN(recruited.ssi.getValue);
|
||||
DUMPTOKEN(recruited.ssi.getKey);
|
||||
DUMPTOKEN(recruited.ssi.getKeyValues);
|
||||
DUMPTOKEN(recruited.ssi.getMappedKeyValues);
|
||||
DUMPTOKEN(recruited.ssi.getShardState);
|
||||
DUMPTOKEN(recruited.ssi.waitMetrics);
|
||||
DUMPTOKEN(recruited.ssi.splitMetrics);
|
||||
DUMPTOKEN(recruited.ssi.getReadHotRanges);
|
||||
DUMPTOKEN(recruited.ssi.getRangeSplitPoints);
|
||||
DUMPTOKEN(recruited.ssi.getStorageMetrics);
|
||||
DUMPTOKEN(recruited.ssi.waitFailure);
|
||||
DUMPTOKEN(recruited.ssi.getQueuingMetrics);
|
||||
DUMPTOKEN(recruited.ssi.getKeyValueStoreType);
|
||||
DUMPTOKEN(recruited.ssi.watchValue);
|
||||
DUMPTOKEN(recruited.ssi.getKeyValuesStream);
|
||||
DUMPTOKEN(recruited.ssi.changeFeedStream);
|
||||
DUMPTOKEN(recruited.ssi.changeFeedPop);
|
||||
DUMPTOKEN(recruited.ssi.changeFeedVersionUpdate);
|
||||
|
||||
Future<Void> blobMigratorProcess = blobMigrator(recruited, dbInfo);
|
||||
errorForwarders.add(forwardError(errors,
|
||||
|
|
Loading…
Reference in New Issue