Use special BlobMigrator UID and remove the data movement restrictions for blob migrator (#9839)
This commit is contained in:
parent
0e720634f3
commit
d5dc848626
|
@ -2434,7 +2434,7 @@ ACTOR Future<Void> startBlobMigrator(ClusterControllerData* self, double waitTim
|
||||||
ProcessClass::NeverAssign,
|
ProcessClass::NeverAssign,
|
||||||
self->db.config,
|
self->db.config,
|
||||||
id_used);
|
id_used);
|
||||||
InitializeBlobMigratorRequest req(deterministicRandom()->randomUniqueID());
|
InitializeBlobMigratorRequest req(BlobMigratorInterface::newId());
|
||||||
state WorkerDetails worker = blobMigratorWorker.worker;
|
state WorkerDetails worker = blobMigratorWorker.worker;
|
||||||
if (self->onMasterIsBetter(worker, ProcessClass::BlobMigrator)) {
|
if (self->onMasterIsBetter(worker, ProcessClass::BlobMigrator)) {
|
||||||
worker = self->id_worker[self->masterProcessId.get()].details;
|
worker = self->id_worker[self->masterProcessId.get()].details;
|
||||||
|
|
|
@ -401,6 +401,11 @@ bool canLaunchSrc(RelocateData& relocation,
|
||||||
ASSERT(relocation.src.size() != 0);
|
ASSERT(relocation.src.size() != 0);
|
||||||
ASSERT(teamSize >= singleRegionTeamSize);
|
ASSERT(teamSize >= singleRegionTeamSize);
|
||||||
|
|
||||||
|
// Blob migrator is backed by s3 so it can allow unlimited data movements
|
||||||
|
if (relocation.src.size() == 1 && BlobMigratorInterface::isBlobMigrator(relocation.src.back())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// find the "workFactor" for this, were it launched now
|
// find the "workFactor" for this, were it launched now
|
||||||
int workFactor = getSrcWorkFactor(relocation, singleRegionTeamSize);
|
int workFactor = getSrcWorkFactor(relocation, singleRegionTeamSize);
|
||||||
int neededServers = std::min<int>(relocation.src.size(), teamSize - singleRegionTeamSize + 1);
|
int neededServers = std::min<int>(relocation.src.size(), teamSize - singleRegionTeamSize + 1);
|
||||||
|
@ -424,6 +429,7 @@ bool canLaunchSrc(RelocateData& relocation,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,9 @@ struct BlobMigratorInterface {
|
||||||
BlobMigratorInterface() {}
|
BlobMigratorInterface() {}
|
||||||
BlobMigratorInterface(const struct LocalityData& l, UID id) : uniqueID(id), locality(l) {
|
BlobMigratorInterface(const struct LocalityData& l, UID id) : uniqueID(id), locality(l) {
|
||||||
ssi.locality = l;
|
ssi.locality = l;
|
||||||
ssi.uniqueID = id;
|
// The second 8 bytes of all blob migration interface id is fixed
|
||||||
|
ASSERT(id.second() == file_identifier);
|
||||||
|
ssi.uniqueID = uniqueID;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initEndpoints() { ssi.initEndpoints(); }
|
void initEndpoints() { ssi.initEndpoints(); }
|
||||||
|
@ -47,6 +49,9 @@ struct BlobMigratorInterface {
|
||||||
bool operator==(const BlobMigratorInterface& r) const { return id() == r.id(); }
|
bool operator==(const BlobMigratorInterface& r) const { return id() == r.id(); }
|
||||||
bool operator!=(const BlobMigratorInterface& r) const { return !(*this == r); }
|
bool operator!=(const BlobMigratorInterface& r) const { return !(*this == r); }
|
||||||
|
|
||||||
|
static UID newId() { return UID(deterministicRandom()->randomUInt64(), file_identifier); }
|
||||||
|
static bool isBlobMigrator(UID id) { return id.second() == file_identifier; };
|
||||||
|
|
||||||
template <class Archive>
|
template <class Archive>
|
||||||
void serialize(Archive& ar) {
|
void serialize(Archive& ar) {
|
||||||
serializer(ar, locality, uniqueID, haltBlobMigrator, waitFailure);
|
serializer(ar, locality, uniqueID, haltBlobMigrator, waitFailure);
|
||||||
|
|
Loading…
Reference in New Issue