add a blob migrator

This commit is contained in:
Hui Liu 2022-09-02 19:21:52 -07:00
parent 06d9ebd620
commit 049df622f1
14 changed files with 446 additions and 9 deletions

View File

@ -570,6 +570,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( RATEKEEPER_FAILURE_TIME, 1.0 );
init( CONSISTENCYSCAN_FAILURE_TIME, 1.0 );
init( BLOB_MANAGER_FAILURE_TIME, 1.0 );
init( BLOB_MIGRATOR_FAILURE_TIME, 1.0 );
init( REPLACE_INTERFACE_DELAY, 60.0 );
init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 );
init( COORDINATOR_REGISTER_INTERVAL, 5.0 );

View File

@ -480,6 +480,7 @@ public:
double RATEKEEPER_FAILURE_TIME;
double CONSISTENCYSCAN_FAILURE_TIME;
double BLOB_MANAGER_FAILURE_TIME;
double BLOB_MIGRATOR_FAILURE_TIME;
double REPLACE_INTERFACE_DELAY;
double REPLACE_INTERFACE_CHECK_DELAY;
double COORDINATOR_REGISTER_INTERVAL;

View File

@ -283,6 +283,15 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::BlobMigrator:
switch (_class) {
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::StorageCache:
switch (_class) {
case ProcessClass::StorageCacheClass:

View File

@ -50,6 +50,7 @@ struct ProcessClass {
BlobWorkerClass,
EncryptKeyProxyClass,
ConsistencyScanClass,
BlobMigratorClass,
InvalidClass = -1
};
@ -77,6 +78,7 @@ struct ProcessClass {
static_assert(ProcessClass::BlobWorkerClass == 19);
static_assert(ProcessClass::EncryptKeyProxyClass == 20);
static_assert(ProcessClass::ConsistencyScanClass == 21);
static_assert(ProcessClass::BlobMigratorClass == 22);
static_assert(ProcessClass::InvalidClass == -1);
enum Fitness {
@ -102,6 +104,7 @@ struct ProcessClass {
ConsistencyScan,
BlobManager,
BlobWorker,
BlobMigrator,
StorageCache,
Backup,
EncryptKeyProxy,

View File

@ -0,0 +1,83 @@
/*
* BlobMigrator.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/BlobConnectionProvider.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/WaitFailure.h"
#include "flow/actorcompiler.h" // has to be last include
// BlobMigrator manages data migration from blob storage to storage server. It implements a minimal set of
// StorageServerInterface APIs which are needed for DataDistributor to start data migration.
class BlobMigrator : public NonCopyable, public ReferenceCounted<BlobMigrator> {
public:
BlobMigrator(Reference<AsyncVar<ServerDBInfo> const> dbInfo, BlobMigratorInterface interf)
: blobMigratorInterf(interf), actors(false) {
if (!blobConn.isValid() && SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
blobConn = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
}
db = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
}
~BlobMigrator() {}
ACTOR static Future<Void> start(Reference<BlobMigrator> self) {
self->actors.add(waitFailureServer(self->blobMigratorInterf.waitFailure.getFuture()));
loop {
choose {
when(HaltBlobMigratorRequest req = waitNext(self->blobMigratorInterf.haltBlobMigrator.getFuture())) {
req.reply.send(Void());
TraceEvent("BlobMigratorHalted", self->blobMigratorInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(wait(self->actors.getResult())) {}
}
}
return Void();
}
private:
Database db;
Reference<BlobConnectionProvider> blobConn;
BlobMigratorInterface blobMigratorInterf;
ActorCollection actors;
};
// Main entry point
ACTOR Future<Void> blobMigrator(BlobMigratorInterface ssi, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
fmt::print("Start blob migrator {} \n", ssi.id().toString());
try {
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, ssi);
wait(BlobMigrator::start(self));
} catch (Error& e) {
fmt::print("unexpected blob migrator error {}\n", e.what());
}
return Void();
}

View File

@ -29,6 +29,8 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/EncryptKeyProxyInterface.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
@ -198,6 +200,32 @@ struct BlobManagerSingleton : Singleton<BlobManagerInterface> {
}
};
struct BlobMigratorSingleton : Singleton<BlobMigratorInterface> {
BlobMigratorSingleton(const Optional<BlobMigratorInterface>& interface) : Singleton(interface) {}
Role getRole() const { return Role::BLOB_MIGRATOR; }
ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::BlobMigrator; }
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
TraceEvent("CCMG_SetInf", cc->id).detail("Id", interface.get().id());
cc->db.setBlobMigrator(interface.get());
}
}
void halt(ClusterControllerData* cc, Optional<Standalone<StringRef>> pid) const {
if (interface.present()) {
TraceEvent("CCMG_Halt", cc->id).detail("Id", interface.get().id());
cc->id_worker[pid].haltBlobMigrator =
brokenPromiseToNever(interface.get().haltBlobMigrator.getReply(HaltBlobMigratorRequest(cc->id)));
}
}
void recruit(ClusterControllerData* cc) const {
cc->lastRecruitTime = now();
cc->recruitBlobMigrator.set(true);
}
};
struct EncryptKeyProxySingleton : Singleton<EncryptKeyProxyInterface> {
EncryptKeyProxySingleton(const Optional<EncryptKeyProxyInterface>& interface) : Singleton(interface) {}
@ -275,6 +303,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.blobManager = db->serverInfo->get().blobManager;
dbInfo.blobMigrator = db->serverInfo->get().blobMigrator;
dbInfo.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy;
dbInfo.consistencyScan = db->serverInfo->get().consistencyScan;
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
@ -656,8 +685,12 @@ void checkBetterSingletons(ClusterControllerData* self) {
WorkerDetails newCSWorker = findNewProcessForSingleton(self, ProcessClass::ConsistencyScan, id_used);
WorkerDetails newBMWorker;
WorkerDetails newMGWorker;
if (self->db.blobGranulesEnabled.get()) {
newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
if (isFullRestoreMode()) {
newMGWorker = findNewProcessForSingleton(self, ProcessClass::BlobMigrator, id_used);
}
}
WorkerDetails newEKPWorker;
@ -671,8 +704,12 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto bestFitnessForCS = findBestFitnessForSingleton(self, newCSWorker, ProcessClass::ConsistencyScan);
ProcessClass::Fitness bestFitnessForBM;
ProcessClass::Fitness bestFitnessForMG;
if (self->db.blobGranulesEnabled.get()) {
bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
if (isFullRestoreMode()) {
bestFitnessForMG = findBestFitnessForSingleton(self, newMGWorker, ProcessClass::BlobManager);
}
}
ProcessClass::Fitness bestFitnessForEKP;
@ -685,6 +722,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto ddSingleton = DataDistributorSingleton(db.distributor);
ConsistencyScanSingleton csSingleton(db.consistencyScan);
BlobManagerSingleton bmSingleton(db.blobManager);
BlobMigratorSingleton mgSingleton(db.blobMigrator);
EncryptKeyProxySingleton ekpSingleton(db.encryptKeyProxy);
// Check if the singletons are healthy.
@ -699,9 +737,14 @@ void checkBetterSingletons(ClusterControllerData* self) {
self, newCSWorker, csSingleton, bestFitnessForCS, self->recruitingConsistencyScanID);
bool bmHealthy = true;
bool mgHealthy = true;
if (self->db.blobGranulesEnabled.get()) {
bmHealthy = isHealthySingleton<BlobManagerInterface>(
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
if (isFullRestoreMode()) {
mgHealthy = isHealthySingleton<BlobMigratorInterface>(
self, newMGWorker, mgSingleton, bestFitnessForMG, self->recruitingBlobMigratorID);
}
}
bool ekpHealthy = true;
@ -711,7 +754,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
}
// if any of the singletons are unhealthy (rerecruited or not stable), then do not
// consider any further re-recruitments
if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy && csHealthy)) {
if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy && csHealthy && mgHealthy)) {
return;
}
@ -725,9 +768,14 @@ void checkBetterSingletons(ClusterControllerData* self) {
Optional<Standalone<StringRef>> newCSProcessId = newCSWorker.interf.locality.processId();
Optional<Standalone<StringRef>> currBMProcessId, newBMProcessId;
Optional<Standalone<StringRef>> currMGProcessId, newMGProcessId;
if (self->db.blobGranulesEnabled.get()) {
currBMProcessId = bmSingleton.interface.get().locality.processId();
newBMProcessId = newBMWorker.interf.locality.processId();
if (isFullRestoreMode()) {
currMGProcessId = mgSingleton.interface.get().locality.processId();
newMGProcessId = newMGWorker.interf.locality.processId();
}
}
Optional<Standalone<StringRef>> currEKPProcessId, newEKPProcessId;
@ -741,6 +789,10 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (self->db.blobGranulesEnabled.get()) {
currPids.emplace_back(currBMProcessId);
newPids.emplace_back(newBMProcessId);
if (isFullRestoreMode()) {
currPids.emplace_back(currMGProcessId);
newPids.emplace_back(newMGProcessId);
}
}
if (SERVER_KNOBS->ENABLE_ENCRYPTION) {
@ -755,6 +807,10 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (!self->db.blobGranulesEnabled.get()) {
ASSERT(currColocMap[currBMProcessId] == 0);
ASSERT(newColocMap[newBMProcessId] == 0);
if (isFullRestoreMode()) {
ASSERT(currColocMap[currMGProcessId] == 0);
ASSERT(newColocMap[newMGProcessId] == 0);
}
}
// if the knob is disabled, the EKP coloc counts should have no affect on the coloc counts check below
@ -767,6 +823,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] &&
newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] &&
newColocMap[newBMProcessId] <= currColocMap[currBMProcessId] &&
newColocMap[newMGProcessId] <= currColocMap[currMGProcessId] &&
newColocMap[newEKPProcessId] <= currColocMap[currEKPProcessId] &&
newColocMap[newCSProcessId] <= currColocMap[currCSProcessId]) {
// rerecruit the singleton for which we have found a better process, if any
@ -776,6 +833,9 @@ void checkBetterSingletons(ClusterControllerData* self) {
ddSingleton.recruit(self);
} else if (self->db.blobGranulesEnabled.get() && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self);
} else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() &&
newColocMap[newMGProcessId] < currColocMap[currMGProcessId]) {
mgSingleton.recruit(self);
} else if (SERVER_KNOBS->ENABLE_ENCRYPTION && newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) {
ekpSingleton.recruit(self);
} else if (newColocMap[newCSProcessId] < currColocMap[currCSProcessId]) {
@ -1330,12 +1390,18 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID);
}
if (self->db.blobGranulesEnabled.get() && req.blobManagerInterf.present()) {
if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() && req.blobManagerInterf.present()) {
auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager);
auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf);
haltRegisteringOrCurrentSingleton<BlobManagerInterface>(
self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID);
}
if (req.blobMigratorInterf.present()) {
auto currSingleton = BlobMigratorSingleton(self->db.serverInfo->get().blobMigrator);
auto registeringSingleton = BlobMigratorSingleton(req.blobMigratorInterf);
haltRegisteringOrCurrentSingleton<BlobMigratorInterface>(
self, w, currSingleton, registeringSingleton, self->recruitingBlobMigratorID);
}
if (SERVER_KNOBS->ENABLE_ENCRYPTION && req.encryptKeyProxyInterf.present()) {
auto currSingleton = EncryptKeyProxySingleton(self->db.serverInfo->get().encryptKeyProxy);
@ -2426,6 +2492,104 @@ ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
}
}
ACTOR Future<Void> startBlobMigrator(ClusterControllerData* self, double waitTime) {
// If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
wait(delay(waitTime));
TraceEvent("CCStartBlobMigrator", self->id).log();
loop {
try {
state bool noBlobMigrator = !self->db.serverInfo->get().blobMigrator.present();
while (!self->masterProcessId.present() ||
self->masterProcessId != self->db.serverInfo->get().master.locality.processId() ||
self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (noBlobMigrator && self->db.serverInfo->get().blobMigrator.present()) {
// Existing instance registers while waiting, so skip.
return Void();
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
WorkerFitnessInfo blobMigratorWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId,
ProcessClass::BlobMigrator,
ProcessClass::NeverAssign,
self->db.config,
id_used);
InitializeBlobMigratorRequest req(deterministicRandom()->randomUniqueID());
state WorkerDetails worker = blobMigratorWorker.worker;
if (self->onMasterIsBetter(worker, ProcessClass::BlobMigrator)) {
worker = self->id_worker[self->masterProcessId.get()].details;
}
self->recruitingBlobMigratorID = req.reqId;
TraceEvent("CCRecruitBlobMigrator", self->id)
.detail("Addr", worker.interf.address())
.detail("MGID", req.reqId);
ErrorOr<BlobMigratorInterface> interf = wait(worker.interf.blobMigrator.getReplyUnlessFailedFor(
req, SERVER_KNOBS->WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 0));
if (interf.present()) {
self->recruitBlobMigrator.set(false);
self->recruitingBlobMigratorID = interf.get().id();
const auto& blobMigrator = self->db.serverInfo->get().blobMigrator;
TraceEvent("CCBlobMigratorRecruited", self->id)
.detail("Addr", worker.interf.address())
.detail("MGID", interf.get().id());
if (blobMigrator.present() && blobMigrator.get().id() != interf.get().id() &&
self->id_worker.count(blobMigrator.get().locality.processId())) {
TraceEvent("CCHaltBlobMigratorAfterRecruit", self->id)
.detail("MGID", blobMigrator.get().id())
.detail("DcID", printable(self->clusterControllerDcId));
BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId());
}
if (!blobMigrator.present() || blobMigrator.get().id() != interf.get().id()) {
self->db.setBlobMigrator(interf.get());
}
checkOutstandingRequests(self);
return Void();
}
} catch (Error& e) {
TraceEvent("CCBlobMigratorRecruitError", self->id).error(e);
if (e.code() != error_code_no_more_servers) {
throw;
}
}
wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY));
}
}
ACTOR Future<Void> monitorBlobMigrator(ClusterControllerData* self) {
state SingletonRecruitThrottler recruitThrottler;
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if (self->db.serverInfo->get().blobMigrator.present() && !self->recruitBlobMigrator.get()) {
state Future<Void> wfClient = waitFailureClient(self->db.serverInfo->get().blobMigrator.get().waitFailure,
SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME);
loop {
choose {
when(wait(wfClient)) {
TraceEvent("CCBlobMigratorDied", self->id)
.detail("MGID", self->db.serverInfo->get().blobMigrator.get().id());
self->db.clearInterf(ProcessClass::BlobMigratorClass);
break;
}
when(wait(self->recruitBlobMigrator.onChange())) {}
}
}
} else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode()) {
// if there is no blob migrator present but blob granules are now enabled, recruit a BM
wait(startBlobMigrator(self, recruitThrottler.newRecruitment()));
} else {
wait(self->db.blobGranulesEnabled.onChange());
}
}
}
ACTOR Future<Void> startBlobManager(ClusterControllerData* self, double waitTime) {
// If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
@ -2552,6 +2716,10 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
const auto& blobManager = self->db.serverInfo->get().blobManager;
BlobManagerSingleton(blobManager)
.haltBlobGranules(self, blobManager.get().locality.processId());
if (isFullRestoreMode()) {
const auto& blobMigrator = self->db.serverInfo->get().blobMigrator;
BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId());
}
break;
}
}
@ -2785,6 +2953,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorBlobManager(&self));
self.addActor.send(monitorBlobMigrator(&self));
self.addActor.send(watchBlobGranulesConfigKey(&self));
self.addActor.send(monitorConsistencyScan(&self));
self.addActor.send(metaclusterMetricsUpdater(&self));

View File

@ -828,6 +828,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("blob_manager", db->get().blobManager.get());
}
if (configuration.present() && configuration.get().blobGranulesEnabled && db->get().blobMigrator.present()) {
roles.addRole("blob_migrator", db->get().blobMigrator.get());
}
if (db->get().consistencyScan.present()) {
roles.addRole("consistency_scan", db->get().consistencyScan.get());
}

View File

@ -1,5 +1,5 @@
/*
* BlobGranuleServerCommon.h
* BlobGranuleServerCommon.actor.h
*
* This source file is part of the FoundationDB open source project
*

View File

@ -0,0 +1,67 @@
/*
* BlobMigratorInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_BLOBMIGRATORINTERFACE_H
#define FDBSERVER_BLOBMIGRATORINTERFACE_H
#pragma once
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/fdbrpc.h"
struct BlobMigratorInterface {
constexpr static FileIdentifier file_identifier = 869199;
RequestStream<struct HaltBlobMigratorRequest> haltBlobMigrator;
RequestStream<ReplyPromise<Void>> waitFailure;
LocalityData locality;
UID uniqueID;
BlobMigratorInterface() {}
BlobMigratorInterface(const struct LocalityData& l, UID id) : uniqueID(id), locality(l) {}
void initEndpoints() {}
UID id() const { return uniqueID; }
NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); }
bool operator==(const BlobMigratorInterface& r) const { return id() == r.id(); }
bool operator!=(const BlobMigratorInterface& r) const { return !(*this == r); }
template <class Archive>
void serialize(Archive& ar) {
// StorageServerInterface::serialize(ar);
serializer(ar, waitFailure, haltBlobMigrator, locality, uniqueID);
}
};
struct HaltBlobMigratorRequest {
constexpr static FileIdentifier file_identifier = 4980139;
UID requesterID;
ReplyPromise<Void> reply;
HaltBlobMigratorRequest() {}
explicit HaltBlobMigratorRequest(UID uid) : requesterID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, reply);
}
};
#endif

View File

@ -22,6 +22,8 @@
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/BlobMigratorInterface.h"
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERCONTROLLER_ACTOR_G_H)
@ -51,6 +53,7 @@ struct WorkerInfo : NonCopyable {
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
Future<Void> haltBlobManager;
Future<Void> haltBlobMigrator;
Future<Void> haltEncryptKeyProxy;
Future<Void> haltConsistencyScan;
Standalone<VectorRef<StringRef>> issues;
@ -184,6 +187,14 @@ public:
serverInfo->set(newInfo);
}
void setBlobMigrator(const BlobMigratorInterface& interf) {
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.blobMigrator = interf;
serverInfo->set(newInfo);
}
void setEncryptKeyProxy(const EncryptKeyProxyInterface& interf) {
auto newInfo = serverInfo->get();
auto newClientInfo = clientInfo->get();
@ -217,6 +228,8 @@ public:
newInfo.ratekeeper = Optional<RatekeeperInterface>();
} else if (t == ProcessClass::BlobManagerClass) {
newInfo.blobManager = Optional<BlobManagerInterface>();
} else if (t == ProcessClass::BlobMigratorClass) {
newInfo.blobMigrator = Optional<BlobMigratorInterface>();
} else if (t == ProcessClass::EncryptKeyProxyClass) {
newInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
newInfo.client.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
@ -317,6 +330,8 @@ public:
db.serverInfo->get().ratekeeper.get().locality.processId() == processId) ||
(db.serverInfo->get().blobManager.present() &&
db.serverInfo->get().blobManager.get().locality.processId() == processId) ||
(db.serverInfo->get().blobMigrator.present() &&
db.serverInfo->get().blobMigrator.get().locality.processId() == processId) ||
(db.serverInfo->get().encryptKeyProxy.present() &&
db.serverInfo->get().encryptKeyProxy.get().locality.processId() == processId) ||
(db.serverInfo->get().consistencyScan.present() &&
@ -3360,6 +3375,8 @@ public:
Optional<UID> recruitingRatekeeperID;
AsyncVar<bool> recruitBlobManager;
Optional<UID> recruitingBlobManagerID;
AsyncVar<bool> recruitBlobMigrator;
Optional<UID> recruitingBlobMigratorID;
AsyncVar<bool> recruitEncryptKeyProxy;
Optional<UID> recruitingEncryptKeyProxyID;
AsyncVar<bool> recruitConsistencyScan;
@ -3401,8 +3418,9 @@ public:
ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), startTime(now()),
goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), remoteDCMonitorStarted(false), remoteTransactionSystemDegraded(false),
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitEncryptKeyProxy(false),
recruitConsistencyScan(false), clusterControllerMetrics("ClusterController", id.toString()),
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitBlobMigrator(false),
recruitEncryptKeyProxy(false), recruitConsistencyScan(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),
getWorkersRequests("GetWorkersRequests", clusterControllerMetrics),

View File

@ -26,12 +26,13 @@
#define FDBSERVER_SERVERDBINFO_H
#pragma once
#include "fdbclient/ConsistencyScanInterface.actor.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbclient/ConsistencyScanInterface.actor.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LatencyBandConfig.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -50,6 +51,7 @@ struct ServerDBInfo {
MasterInterface master; // The best guess as to the most recent master, which might still be recovering
Optional<RatekeeperInterface> ratekeeper;
Optional<BlobManagerInterface> blobManager;
Optional<BlobMigratorInterface> blobMigrator;
Optional<EncryptKeyProxyInterface> encryptKeyProxy;
Optional<ConsistencyScanInterface> consistencyScan;
std::vector<ResolverInterface> resolvers;
@ -84,6 +86,7 @@ struct ServerDBInfo {
master,
ratekeeper,
blobManager,
blobMigrator,
encryptKeyProxy,
consistencyScan,
resolvers,

View File

@ -33,6 +33,7 @@
#include "fdbserver/RatekeeperInterface.h"
#include "fdbclient/ConsistencyScanInterface.actor.h"
#include "fdbserver/BlobManagerInterface.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/ClientBooleanParams.h"
@ -59,6 +60,7 @@ struct WorkerInterface {
RequestStream<struct InitializeBlobManagerRequest> blobManager;
RequestStream<struct InitializeBlobWorkerRequest> blobWorker;
RequestStream<struct InitializeConsistencyScanRequest> consistencyScan;
RequestStream<struct InitializeBlobMigratorRequest> blobMigrator;
RequestStream<struct InitializeResolverRequest> resolver;
RequestStream<struct InitializeStorageRequest> storage;
RequestStream<struct InitializeLogRouterRequest> logRouter;
@ -115,6 +117,7 @@ struct WorkerInterface {
blobManager,
blobWorker,
consistencyScan,
blobMigrator,
resolver,
storage,
logRouter,
@ -430,6 +433,7 @@ struct RegisterWorkerRequest {
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<BlobManagerInterface> blobManagerInterf;
Optional<BlobMigratorInterface> blobMigratorInterf;
Optional<EncryptKeyProxyInterface> encryptKeyProxyInterf;
Optional<ConsistencyScanInterface> consistencyScanInterf;
Standalone<VectorRef<StringRef>> issues;
@ -452,6 +456,7 @@ struct RegisterWorkerRequest {
Optional<DataDistributorInterface> ddInterf,
Optional<RatekeeperInterface> rkInterf,
Optional<BlobManagerInterface> bmInterf,
Optional<BlobMigratorInterface> mgInterf,
Optional<EncryptKeyProxyInterface> ekpInterf,
Optional<ConsistencyScanInterface> csInterf,
bool degraded,
@ -461,9 +466,10 @@ struct RegisterWorkerRequest {
ConfigBroadcastInterface configBroadcastInterface)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
encryptKeyProxyInterf(ekpInterf), consistencyScanInterf(csInterf), degraded(degraded),
lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet), requestDbInfo(false),
recoveredDiskFiles(recoveredDiskFiles), configBroadcastInterface(configBroadcastInterface) {}
blobMigratorInterf(mgInterf), encryptKeyProxyInterf(ekpInterf), consistencyScanInterf(csInterf),
degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet),
requestDbInfo(false), recoveredDiskFiles(recoveredDiskFiles),
configBroadcastInterface(configBroadcastInterface) {}
template <class Ar>
void serialize(Ar& ar) {
@ -476,6 +482,7 @@ struct RegisterWorkerRequest {
distributorInterf,
ratekeeperInterf,
blobManagerInterf,
blobMigratorInterf,
encryptKeyProxyInterf,
consistencyScanInterf,
issues,
@ -762,6 +769,19 @@ struct InitializeBlobManagerRequest {
}
};
struct InitializeBlobMigratorRequest {
constexpr static FileIdentifier file_identifier = 7932681;
UID reqId;
ReplyPromise<BlobMigratorInterface> reply;
InitializeBlobMigratorRequest() {}
explicit InitializeBlobMigratorRequest(UID uid) : reqId(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, reply);
}
};
struct InitializeResolverRequest {
constexpr static FileIdentifier file_identifier = 7413317;
LifetimeToken masterLifetime;
@ -1006,6 +1026,7 @@ struct Role {
static const Role RATEKEEPER;
static const Role BLOB_MANAGER;
static const Role BLOB_WORKER;
static const Role BLOB_MIGRATOR;
static const Role STORAGE_CACHE;
static const Role COORDINATOR;
static const Role BACKUP;
@ -1042,6 +1063,8 @@ struct Role {
return BLOB_MANAGER;
case ProcessClass::BlobWorker:
return BLOB_WORKER;
case ProcessClass::BlobMigrator:
return BLOB_MIGRATOR;
case ProcessClass::StorageCache:
return STORAGE_CACHE;
case ProcessClass::Backup:
@ -1173,6 +1196,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<Async
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> consistencyScan(ConsistencyScanInterface csInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
ACTOR Future<Void> blobManager(BlobManagerInterface bmi, Reference<AsyncVar<ServerDBInfo> const> db, int64_t epoch);
ACTOR Future<Void> blobMigrator(BlobMigratorInterface mgi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db);

View File

@ -23,6 +23,7 @@
#include <boost/lexical_cast.hpp>
#include "fdbclient/FDBTypes.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "flow/ApiVersion.h"
#include "flow/IAsyncFile.h"
#include "fdbrpc/Locality.h"
@ -561,6 +562,7 @@ ACTOR Future<Void> registrationClient(
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>> const> bmInterf,
Reference<AsyncVar<Optional<BlobMigratorInterface>> const> blobMigratorInterf,
Reference<AsyncVar<Optional<EncryptKeyProxyInterface>> const> ekpInterf,
Reference<AsyncVar<Optional<ConsistencyScanInterface>> const> csInterf,
Reference<AsyncVar<bool> const> degraded,
@ -602,6 +604,7 @@ ACTOR Future<Void> registrationClient(
ddInterf->get(),
rkInterf->get(),
bmInterf->get().present() ? bmInterf->get().get().second : Optional<BlobManagerInterface>(),
blobMigratorInterf->get(),
ekpInterf->get(),
csInterf->get(),
degraded->get(),
@ -674,6 +677,7 @@ ACTOR Future<Void> registrationClient(
when(wait(rkInterf->onChange())) { break; }
when(wait(csInterf->onChange())) { break; }
when(wait(bmInterf->onChange())) { break; }
when(wait(blobMigratorInterf->onChange())) { break; }
when(wait(ekpInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
@ -707,6 +711,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
return true;
}
if (dbi.blobMigrator.present() && dbi.blobMigrator.get().address() == address) {
return true;
}
if (dbi.encryptKeyProxy.present() && dbi.encryptKeyProxy.get().address() == address) {
return true;
}
@ -1651,6 +1659,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Reference<AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>>> bmEpochAndInterf(
new AsyncVar<Optional<std::pair<int64_t, BlobManagerInterface>>>());
state Reference<AsyncVar<Optional<BlobMigratorInterface>>> blobMigratorInterf(
new AsyncVar<Optional<BlobMigratorInterface>>());
state UID lastBMRecruitRequestId;
state Reference<AsyncVar<Optional<EncryptKeyProxyInterface>>> ekpInterf(
new AsyncVar<Optional<EncryptKeyProxyInterface>>());
@ -1977,6 +1987,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ddInterf,
rkInterf,
bmEpochAndInterf,
blobMigratorInterf,
ekpInterf,
csInterf,
degraded,
@ -2023,6 +2034,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID())
.detail("BlobManagerID",
localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID())
.detail("BlobMigratorID",
localInfo.blobMigrator.present() ? localInfo.blobMigrator.get().id() : UID())
.detail("EncryptKeyProxyID",
localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID());
@ -2242,6 +2255,31 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBlobMigratorRequest req = waitNext(interf.blobMigrator.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobMigrator;
BlobMigratorInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (blobMigratorInterf->get().present()) {
recruited = blobMigratorInterf->get().get();
CODE_PROBE(true, "Recruited while already a blob migrator.");
} else {
startRole(Role::BLOB_MIGRATOR, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> blobMigratorProcess = blobMigrator(recruited, dbInfo);
errorForwarders.add(forwardError(errors,
Role::BLOB_MIGRATOR,
recruited.id(),
setWhenDoneOrError(blobMigratorProcess,
blobMigratorInterf,
Optional<BlobMigratorInterface>())));
blobMigratorInterf->set(Optional<BlobMigratorInterface>(recruited));
}
TraceEvent("BlobMigrator_InitRequest", req.reqId).detail("BlobMigratorId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
if (!backupWorkerCache.exists(req.reqId)) {
LocalLineage _;
@ -3546,6 +3584,7 @@ const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD");
const Role Role::RATEKEEPER("Ratekeeper", "RK");
const Role Role::BLOB_MANAGER("BlobManager", "BM");
const Role Role::BLOB_WORKER("BlobWorker", "BW");
const Role Role::BLOB_MIGRATOR("BlobMigrator", "MG");
const Role Role::STORAGE_CACHE("StorageCache", "SC");
const Role Role::COORDINATOR("Coordinator", "CD");
const Role Role::BACKUP("Backup", "BK");

View File

@ -1538,6 +1538,22 @@ struct ConsistencyCheckWorkload : TestWorkload {
return false;
}
// Check BlobMigrator
if (config.blobGranulesEnabled && db.blobMigrator.present() &&
(!nonExcludedWorkerProcessMap.count(db.blobMigrator.get().address()) ||
nonExcludedWorkerProcessMap[db.blobMigrator.get().address()].processClass.machineClassFitness(
ProcessClass::BlobMigrator) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_BlobMigratorNotBest")
.detail("BestBlobMigratorFitness", fitnessLowerBound)
.detail(
"ExistingBlobMigratorFitness",
nonExcludedWorkerProcessMap.count(db.blobMigrator.get().address())
? nonExcludedWorkerProcessMap[db.blobMigrator.get().address()].processClass.machineClassFitness(
ProcessClass::BlobMigrator)
: -1);
return false;
}
// Check EncryptKeyProxy
if (SERVER_KNOBS->ENABLE_ENCRYPTION && db.encryptKeyProxy.present() &&
(!nonExcludedWorkerProcessMap.count(db.encryptKeyProxy.get().address()) ||