Move cluster ID from txnStateStore to the database
The cluster ID is now stored in the database instead of in the txnStateStore. The cluster controller will read it on boot and send it to all processes to persist.
This commit is contained in:
parent
5ca2b89bdf
commit
bba05b7c9b
|
@ -284,8 +284,6 @@ const KeyRangeRef readConflictRangeKeysRange =
|
|||
const KeyRangeRef writeConflictRangeKeysRange = KeyRangeRef("\xff\xff/transaction/write_conflict_range/"_sr,
|
||||
"\xff\xff/transaction/write_conflict_range/\xff\xff"_sr);
|
||||
|
||||
const KeyRef clusterIdKey = "\xff/clusterId"_sr;
|
||||
|
||||
const KeyRangeRef auditRange = KeyRangeRef("\xff/audit/"_sr, "\xff/audit0"_sr);
|
||||
const KeyRef auditPrefix = auditRange.begin;
|
||||
|
||||
|
@ -1074,6 +1072,9 @@ const KeyRangeRef timeKeeperPrefixRange("\xff\x02/timeKeeper/map/"_sr, "\xff\x02
|
|||
const KeyRef timeKeeperVersionKey = "\xff\x02/timeKeeper/version"_sr;
|
||||
const KeyRef timeKeeperDisableKey = "\xff\x02/timeKeeper/disable"_sr;
|
||||
|
||||
// Durable cluster ID key
|
||||
const KeyRef clusterIdKey = "\xff/clusterId"_sr;
|
||||
|
||||
// Backup Log Mutation constant variables
|
||||
const KeyRef backupEnabledKey = "\xff/backupEnabled"_sr;
|
||||
const KeyRangeRef backupLogKeys("\xff\x02/blog/"_sr, "\xff\x02/blog0"_sr);
|
||||
|
@ -1810,4 +1811,4 @@ TEST_CASE("noSim/SystemData/compat/KeyServers") {
|
|||
printf("ssi serdes test complete\n");
|
||||
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,8 +92,6 @@ void decodeKeyServersValue(RangeResult result,
|
|||
UID& destID,
|
||||
bool missingIsError = true);
|
||||
|
||||
extern const KeyRef clusterIdKey;
|
||||
|
||||
extern const KeyRangeRef auditRange;
|
||||
extern const KeyRef auditPrefix;
|
||||
const Key auditRangeKey(const AuditType type, const UID& auditId, const KeyRef& key);
|
||||
|
@ -505,6 +503,9 @@ extern const KeyRangeRef timeKeeperPrefixRange;
|
|||
extern const KeyRef timeKeeperVersionKey;
|
||||
extern const KeyRef timeKeeperDisableKey;
|
||||
|
||||
// Durable cluster ID key
|
||||
extern const KeyRef clusterIdKey;
|
||||
|
||||
// Layer status metadata prefix
|
||||
extern const KeyRangeRef layerStatusMetaPrefixRange;
|
||||
|
||||
|
|
|
@ -2976,6 +2976,51 @@ ACTOR Future<Void> metaclusterMetricsUpdater(ClusterControllerData* self) {
|
|||
}
|
||||
}
|
||||
|
||||
// Update the DBInfo state with this processes cluster ID. If this process does
|
||||
// not have a cluster ID and one does not exist in the database, generate one.
|
||||
ACTOR Future<Void> updateClusterId(ClusterControllerData* self) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
|
||||
loop {
|
||||
try {
|
||||
state Optional<UID> durableClusterId = self->clusterId->get();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
Optional<Value> clusterIdVal = wait(tr->get(clusterIdKey));
|
||||
|
||||
if (clusterIdVal.present()) {
|
||||
UID clusterId = BinaryReader::fromStringRef<UID>(clusterIdVal.get(), IncludeVersion());
|
||||
if (durableClusterId.present()) {
|
||||
// If this process has an on disk file for the cluster ID,
|
||||
// verify it matches the value in the database.
|
||||
ASSERT(clusterId == durableClusterId.get());
|
||||
} else {
|
||||
// Otherwise, write the cluster ID in the database to the
|
||||
// DbInfo object so all clients will learn of the cluster
|
||||
// ID.
|
||||
durableClusterId = clusterId;
|
||||
}
|
||||
} else if (!durableClusterId.present()) {
|
||||
// No cluster ID exists in the database or on the machine. Generate and set one.
|
||||
ASSERT(!durableClusterId.present());
|
||||
durableClusterId = deterministicRandom()->randomUniqueID();
|
||||
tr->set(clusterIdKey, BinaryWriter::toValue(durableClusterId.get(), IncludeVersion()));
|
||||
wait(tr->commit());
|
||||
}
|
||||
auto serverInfo = self->db.serverInfo->get();
|
||||
if (!serverInfo.client.clusterId.isValid()) {
|
||||
ASSERT(durableClusterId.present());
|
||||
serverInfo.id = deterministicRandom()->randomUniqueID();
|
||||
serverInfo.client.clusterId = durableClusterId.get();
|
||||
self->db.serverInfo->set(serverInfo);
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
||||
Future<Void> leaderFail,
|
||||
ServerCoordinators coordinators,
|
||||
|
@ -3020,6 +3065,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
|||
self.addActor.send(monitorConsistencyScan(&self));
|
||||
self.addActor.send(metaclusterMetricsUpdater(&self));
|
||||
self.addActor.send(dbInfoUpdater(&self));
|
||||
self.addActor.send(updateClusterId(&self));
|
||||
self.addActor.send(self.clusterControllerMetrics.traceCounters("ClusterControllerMetrics",
|
||||
self.id,
|
||||
SERVER_KNOBS->STORAGE_LOGGING_DELAY,
|
||||
|
|
|
@ -2097,13 +2097,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
errorForwarders.add(
|
||||
success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
|
||||
|
||||
if (!clusterId->get().present() && localInfo.recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
|
||||
localInfo.client.clusterId.isValid()) {
|
||||
// Persist the cluster ID as a file in the data
|
||||
// directory once recovery has made the transaction
|
||||
// state store durable. The txnStateStore also stores
|
||||
// the cluster ID.
|
||||
// TODO: Does the txnStateStore need to store the cluster ID?
|
||||
if (!clusterId->get().present() && localInfo.client.clusterId.isValid()) {
|
||||
state UID tmpClusterId = localInfo.client.clusterId;
|
||||
wait(createClusterIdFile(folder, tmpClusterId));
|
||||
clusterId->set(tmpClusterId);
|
||||
|
|
Loading…
Reference in New Issue