Remove cluster ID logic from individual roles

The logic to determine the validity of a process joining a cluster now
belongs on the worker and the cluster controller. It is no longer
restricted to tlogs and storages, but instead applies to all processes
(even stateless ones).
This commit is contained in:
Lukas Joswiak 2022-10-18 18:21:36 -07:00
parent cf6c5c3a47
commit 72bc89cf39
14 changed files with 54 additions and 325 deletions

View File

@ -613,7 +613,7 @@ private:
m.param1.startsWith(applyMutationsAddPrefixRange.begin) ||
m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) ||
m.param1.startsWith(serverTagHistoryPrefix) ||
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin) || m.param1 == clusterIdKey) {
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
}

View File

@ -1060,8 +1060,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
.detail("GrvProxies", req.grvProxies.size())
.detail("RecoveryCount", req.recoveryCount)
.detail("Stalled", req.recoveryStalled)
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch)
.detail("ClusterId", req.clusterId);
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch);
// make sure the request comes from an active database
auto db = &self->db;
@ -1120,7 +1119,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
// Construct the client information
if (db->clientInfo->get().commitProxies != req.commitProxies ||
db->clientInfo->get().grvProxies != req.grvProxies ||
db->clientInfo->get().tenantMode != db->config.tenantMode || db->clientInfo->get().clusterId != req.clusterId ||
db->clientInfo->get().tenantMode != db->config.tenantMode ||
db->clientInfo->get().isEncryptionEnabled != SERVER_KNOBS->ENABLE_ENCRYPTION ||
db->clientInfo->get().clusterType != db->clusterType ||
db->clientInfo->get().metaclusterName != db->metaclusterName ||
@ -1133,8 +1132,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
.detail("ReqCPs", req.commitProxies)
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
.detail("ReqTenantMode", db->config.tenantMode.toString())
.detail("ClusterId", db->clientInfo->get().clusterId)
.detail("ReqClusterId", req.clusterId)
.detail("EncryptionEnabled", SERVER_KNOBS->ENABLE_ENCRYPTION)
.detail("ClusterType", db->clientInfo->get().clusterType)
.detail("ReqClusterType", db->clusterType)
@ -1149,7 +1146,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
clientInfo.commitProxies = req.commitProxies;
clientInfo.grvProxies = req.grvProxies;
clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
clientInfo.clusterId = req.clusterId;
clientInfo.clusterType = db->clusterType;
clientInfo.metaclusterName = db->metaclusterName;
db->clientInfo->set(clientInfo);
@ -1230,7 +1226,6 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
const WorkerInterface& w = req.wi;
if (req.clusterId.present() && self->clusterId->get().present() && req.clusterId != self->clusterId->get() &&
req.processClass != ProcessClass::TesterClass) {
// TODO: Track invalid processes separately, report status in status json
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "WorkerBelongsToExistingCluster", self->id)
.detail("WorkerClusterId", req.clusterId)
.detail("ClusterControllerClusterId", self->clusterId->get())

View File

@ -297,7 +297,6 @@ ACTOR Future<Void> newTLogServers(Reference<ClusterRecoveryData> self,
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait(oldLogSystem->newEpoch(recr,
fRemoteWorkers,
self->clusterId,
self->configuration,
self->cstate.myDBState.recoveryCount + 1,
self->recoveryTransactionVersion,
@ -311,7 +310,6 @@ ACTOR Future<Void> newTLogServers(Reference<ClusterRecoveryData> self,
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait(oldLogSystem->newEpoch(recr,
Never(),
self->clusterId,
self->configuration,
self->cstate.myDBState.recoveryCount + 1,
self->recoveryTransactionVersion,
@ -347,7 +345,6 @@ ACTOR Future<Void> newSeedServers(Reference<ClusterRecoveryData> self,
isr.storeType = self->configuration.storageServerStoreType;
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = deterministicRandom()->randomUniqueID();
isr.clusterId = self->clusterId;
isr.initialClusterVersion = self->recoveryTransactionVersion;
ErrorOr<InitializeStorageReply> newServer = wait(recruits.storageServers[idx].storage.tryGetReply(isr));
@ -477,7 +474,6 @@ ACTOR Future<Void> trackTlogRecovery(Reference<ClusterRecoveryData> self,
self->dbgid)
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("ClusterId", self->clusterId)
.trackLatest(self->clusterRecoveryStateEventHolder->trackingKey);
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_GENERATION_EVENT_NAME).c_str(),
@ -786,7 +782,6 @@ Future<Void> sendMasterRegistration(ClusterRecoveryData* self,
masterReq.priorCommittedLogServers = priorCommittedLogServers;
masterReq.recoveryState = self->recoveryState;
masterReq.recoveryStalled = self->recruitmentStalled->get();
masterReq.clusterId = self->clusterId;
return brokenPromiseToNever(self->clusterController.registerMaster.getReply(masterReq));
}
@ -1350,8 +1345,7 @@ ACTOR Future<Void> recoverFrom(Reference<ClusterRecoveryData> self,
Reference<ILogSystem> oldLogSystem,
std::vector<StorageServerInterface>* seedServers,
std::vector<Standalone<CommitTransactionRef>>* initialConfChanges,
Future<Version> poppedTxsVersion,
bool* clusterIdExists) {
Future<Version> poppedTxsVersion) {
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_transaction_system_state])
@ -1375,16 +1369,6 @@ ACTOR Future<Void> recoverFrom(Reference<ClusterRecoveryData> self,
debug_checkMaxRestoredVersion(UID(), self->lastEpochEnd, "DBRecovery");
// Generate a cluster ID to uniquely identify the cluster if it doesn't
// already exist in the txnStateStore.
Optional<Value> clusterId = self->txnStateStore->readValue(clusterIdKey).get();
*clusterIdExists = clusterId.present();
if (!clusterId.present()) {
self->clusterId = deterministicRandom()->randomUniqueID();
} else {
self->clusterId = BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
}
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a
// second, a provisional master is initialized, and an "emergency transaction" is submitted that might change the
// configuration so that we can finish recovery.
@ -1540,7 +1524,6 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
state Future<Void> logChanges;
state Future<Void> minRecoveryDuration;
state Future<Version> poppedTxsVersion;
state bool clusterIdExists = false;
loop {
Reference<ILogSystem> oldLogSystem = oldLogSystems->get();
@ -1556,13 +1539,9 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
self->registrationTrigger.trigger();
choose {
when(wait(oldLogSystem ? recoverFrom(self,
oldLogSystem,
&seedServers,
&initialConfChanges,
poppedTxsVersion,
std::addressof(clusterIdExists))
: Never())) {
when(wait(oldLogSystem
? recoverFrom(self, oldLogSystem, &seedServers, &initialConfChanges, poppedTxsVersion)
: Never())) {
reg.cancel();
break;
}
@ -1591,7 +1570,6 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
.detail("Status", RecoveryStatus::names[RecoveryStatus::recovery_transaction])
.detail("PrimaryLocality", self->primaryLocality)
.detail("DcId", self->masterInterface.locality.dcId())
.detail("ClusterId", self->clusterId)
.trackLatest(self->clusterRecoveryStateEventHolder->trackingKey);
// Recovery transaction
@ -1680,11 +1658,6 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
}
}
// Write cluster ID into txnStateStore if it is missing.
if (!clusterIdExists) {
tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned()));
}
applyMetadataMutations(SpanContext(),
self->dbgid,
recoveryCommitRequest.arena,

View File

@ -2284,15 +2284,12 @@ public:
self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());
UID clusterId = wait(self->getClusterId());
state InitializeStorageRequest isr;
isr.storeType = recruitTss ? self->configuration.testingStorageServerStoreType
: self->configuration.storageServerStoreType;
isr.seedTag = invalidTag;
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = interfaceId;
isr.clusterId = clusterId;
// if tss, wait for pair ss to finish and add its id to isr. If pair fails, don't recruit tss
state bool doRecruit = true;
@ -3470,10 +3467,6 @@ Future<Void> DDTeamCollection::monitorHealthyTeams() {
return DDTeamCollectionImpl::monitorHealthyTeams(this);
}
Future<UID> DDTeamCollection::getClusterId() {
return db->getClusterId();
}
Future<UID> DDTeamCollection::getNextWigglingServerID() {
Optional<Value> localityKey;
Optional<Value> localityValue;

View File

@ -221,21 +221,6 @@ class DDTxnProcessorImpl {
}
}
ACTOR static Future<UID> getClusterId(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
ASSERT(clusterId.present());
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Read keyservers, return unique set of teams
ACTOR static Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
Database cx,
@ -675,10 +660,6 @@ Future<int> DDTxnProcessor::tryUpdateReplicasKeyForDc(const Optional<Key>& dcId,
return DDTxnProcessorImpl::tryUpdateReplicasKeyForDc(cx, dcId, storageTeamSize);
}
Future<UID> DDTxnProcessor::getClusterId() const {
return DDTxnProcessorImpl::getClusterId(cx);
}
Future<Void> DDTxnProcessor::waitDDTeamInfoPrintSignal() const {
return DDTxnProcessorImpl::waitDDTeamInfoPrintSignal(cx);
}

View File

@ -26,7 +26,6 @@
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/SpanContextMessage.h"
#include "fdbserver/TLogInterface.h"
@ -217,8 +216,6 @@ static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistClusterIdKey = "clusterId"_sr;
static Key persistTagMessagesKey(UID id, Tag tag, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(persistTagMessagesKeys.begin);
@ -306,13 +303,6 @@ struct TLogData : NonCopyable {
Deque<UID> spillOrder;
std::map<UID, Reference<struct LogData>> id_data;
// The durable cluster ID identifies which cluster the tlogs persistent
// data is written from. This value is restored from disk when the tlog
// restarts.
UID durableClusterId;
// The cluster-controller cluster ID stores the cluster ID read from the txnStateStore.
// It is cached in this variable.
UID ccClusterId;
UID dbgid;
UID workerID;
@ -2401,24 +2391,6 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<UID> getClusterId(TLogData* self) {
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
if (clusterId.present()) {
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} else {
return UID();
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// send stopped promise instead of LogData* to avoid reference cycles
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
@ -2441,26 +2413,14 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
}
isDisplaced = isDisplaced && !inf.logSystemConfig.hasTLog(tli.id());
if (isDisplaced) {
state TraceEvent ev("TLogDisplaced", tli.id());
ev.detail("Reason", "DBInfoDoesNotContain")
TraceEvent("TLogDisplaced", tli.id())
.detail("Reason", "DBInfoDoesNotContain")
.detail("RecoveryCount", recoveryCount)
.detail("InfRecoveryCount", inf.recoveryCount)
.detail("RecoveryState", (int)inf.recoveryState)
.detail("LogSysConf", describe(inf.logSystemConfig.tLogs))
.detail("PriorLogs", describe(inf.priorCommittedLogServers))
.detail("OldLogGens", inf.logSystemConfig.oldTLogs.size());
// Read and cache cluster ID before displacing this tlog. We want
// to avoid removing the tlogs data if it has joined a new cluster
// with a different cluster ID.
// TODO: #5375
/*
state UID clusterId = wait(getClusterId(self));
ASSERT(clusterId.isValid());
self->ccClusterId = clusterId;
ev.detail("ClusterId", clusterId).detail("SelfClusterId", self->durableClusterId);
*/
if (BUGGIFY)
wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01()));
throw worker_removed();
@ -2619,28 +2579,6 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
return Void();
}
// TODO: Remove all cluster ID logic from tlog and storage server
ACTOR Future<Void> updateDurableClusterID(TLogData* self) {
loop {
// Persist cluster ID once cluster has recovered.
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED) {
ASSERT(!self->durableClusterId.isValid());
state UID ccClusterId = self->dbInfo->get().client.clusterId;
self->durableClusterId = ccClusterId;
ASSERT(ccClusterId.isValid());
wait(self->persistentDataCommitLock.take());
state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock);
self->persistentData->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(ccClusterId, Unversioned())));
wait(self->persistentData->commit());
return Void();
}
wait(self->dbInfo->onChange());
}
}
ACTOR Future<Void> serveTLogInterface(TLogData* self,
TLogInterface tli,
Reference<LogData> logData,
@ -3028,7 +2966,6 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state IKeyValueStore* storage = self->persistentData;
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Optional<Value>> fClusterId = storage->readValue(persistClusterIdKey);
state Future<RangeResult> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<RangeResult> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<RangeResult> fLocality = storage->readRange(persistLocalityKeys);
@ -3040,7 +2977,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// FIXME: metadata in queue?
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fClusterId }));
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
@ -3050,10 +2987,6 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
fProtocolVersions,
fTLogSpillTypes }));
if (fClusterId.get().present()) {
self->durableClusterId = BinaryReader::fromStringRef<UID>(fClusterId.get().get(), Unversioned());
}
if (fFormat.get().present() && !persistFormatReadableRange.contains(fFormat.get().get())) {
// FIXME: remove when we no longer need to test upgrades from 4.X releases
if (g_network->isSimulated()) {
@ -3316,7 +3249,7 @@ bool tlogTerminated(TLogData* self, IKeyValueStore* persistentData, TLogQueue* p
}
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed ||
e.code() == error_code_file_not_found || e.code() == error_code_invalid_cluster_id) {
e.code() == error_code_file_not_found) {
TraceEvent("TLogTerminated", self->dbgid).errorUnsuppressed(e);
return true;
} else
@ -3592,86 +3525,52 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId);
try {
try {
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
wait(ioTimeoutError(persistentData->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {
wait(ioTimeoutError(checkEmptyQueue(&self) && initPersistentStorage(&self),
SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
}
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {
wait(ioTimeoutError(checkEmptyQueue(&self) && initPersistentStorage(&self),
SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
}
// Disk errors need a chance to kill this actor.
wait(delay(0.000001));
// Disk errors need a chance to kill this actor.
wait(delay(0.000001));
if (recovered.canBeSet())
recovered.send(Void());
if (recovered.canBeSet())
recovered.send(Void());
// if (!self.durableClusterId.isValid()) {
// self.sharedActors.send(updateDurableClusterID(&self));
// }
self.sharedActors.send(commitQueue(&self));
self.sharedActors.send(updateStorageLoop(&self));
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
state Future<Void> activeSharedChange = Void();
self.sharedActors.send(commitQueue(&self));
self.sharedActors.send(updateStorageLoop(&self));
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
state Future<Void> activeSharedChange = Void();
loop {
choose {
when(state InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) {
if (!self.tlogCache.exists(req.recruitmentID)) {
self.tlogCache.set(req.recruitmentID, req.reply.getFuture());
self.sharedActors.send(
self.tlogCache.removeOnReady(req.recruitmentID, tLogStart(&self, req, locality)));
} else {
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
when(wait(error)) { throw internal_error(); }
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid)
.detail("NowActive", activeSharedTLog->get());
self.sharedActors.send(startSpillingInTenSeconds(&self, tlogId, activeSharedTLog));
}
activeSharedChange = activeSharedTLog->onChange();
loop {
choose {
when(state InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) {
if (!self.tlogCache.exists(req.recruitmentID)) {
self.tlogCache.set(req.recruitmentID, req.reply.getFuture());
self.sharedActors.send(
self.tlogCache.removeOnReady(req.recruitmentID, tLogStart(&self, req, locality)));
} else {
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
when(wait(error)) {
throw internal_error();
}
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.sharedActors.send(startSpillingInTenSeconds(&self, tlogId, activeSharedTLog));
}
activeSharedChange = activeSharedTLog->onChange();
}
}
} catch (Error& e) {
throw;
// TODO: #5375
/*
if (e.code() != error_code_worker_removed) {
throw;
}
// Don't need to worry about deleting data if there is no durable
// cluster ID.
if (!self.durableClusterId.isValid()) {
throw;
}
// When a tlog joins a new cluster and has data for an old cluster,
// it should automatically exclude itself to avoid being used in
// the new cluster.
auto recoveryState = self.dbInfo->get().recoveryState;
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.ccClusterId.isValid() &&
self.durableClusterId.isValid() && self.ccClusterId != self.durableClusterId) {
state NetworkAddress address = g_network->getLocalAddress();
wait(excludeServers(self.cx, { AddressExclusion{ address.ip, address.port } }));
TraceEvent(SevWarnAlways, "TLogBelongsToExistingCluster")
.detail("ClusterId", self.durableClusterId)
.detail("NewClusterId", self.ccClusterId);
}
// If the tlog has a valid durable cluster ID, we don't want it to
// wipe its data! Throw this error to signal to `tlogTerminated` to
// close the persistent data store instead of deleting it.
throw invalid_cluster_id();
*/
}
} catch (Error& e) {
self.terminated.send(Void());

View File

@ -1635,7 +1635,6 @@ Future<Void> TagPartitionedLogSystem::endEpoch() {
Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
RecruitFromConfigurationReply const& recr,
Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,
@ -1646,7 +1645,6 @@ Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
return newEpoch(Reference<TagPartitionedLogSystem>::addRef(this),
recr,
fRemoteWorkers,
clusterId,
config,
recoveryCount,
recoveryTransactionVersion,
@ -2546,7 +2544,6 @@ std::vector<Tag> TagPartitionedLogSystem::getLocalTags(int8_t locality, const st
ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSystem* self,
Reference<TagPartitionedLogSystem> oldLogSystem,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,
@ -2690,7 +2687,6 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
req.startVersion = logSet->startVersion;
req.logRouterTags = 0;
req.txsTags = self->txsTags;
req.clusterId = clusterId;
req.recoveryTransactionVersion = recoveryTransactionVersion;
}
@ -2742,7 +2738,6 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
Reference<TagPartitionedLogSystem> oldLogSystem,
RecruitFromConfigurationReply recr,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,
@ -2965,7 +2960,6 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.startVersion = logSystem->tLogs[0]->startVersion;
req.logRouterTags = logSystem->logRouterTags;
req.txsTags = logSystem->txsTags;
req.clusterId = clusterId;
req.recoveryTransactionVersion = recoveryTransactionVersion;
}
@ -3035,7 +3029,6 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.startVersion = oldLogSystem->knownCommittedVersion + 1;
req.logRouterTags = logSystem->logRouterTags;
req.txsTags = logSystem->txsTags;
req.clusterId = clusterId;
req.recoveryTransactionVersion = recoveryTransactionVersion;
}
@ -3094,7 +3087,6 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(),
oldLogSystem,
fRemoteWorkers,
clusterId,
configuration,
recoveryCount,
recoveryTransactionVersion,

View File

@ -468,8 +468,6 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
bool recruitTss,
Reference<TSSPairState> tssState);
Future<UID> getClusterId();
// return the next ServerID in storageWiggler
Future<UID> getNextWigglingServerID();

View File

@ -136,8 +136,6 @@ public:
virtual Future<Optional<Value>> readRebalanceDDIgnoreKey() const { return {}; }
virtual Future<UID> getClusterId() const { return {}; }
virtual Future<Void> waitDDTeamInfoPrintSignal() const { return Never(); }
virtual Future<std::vector<ProcessData>> getWorkers() const = 0;
@ -221,8 +219,6 @@ public:
Future<Optional<Value>> readRebalanceDDIgnoreKey() const override;
Future<UID> getClusterId() const override;
Future<Void> waitDDTeamInfoPrintSignal() const override;
Future<std::vector<ProcessData>> getWorkers() const override;

View File

@ -641,7 +641,6 @@ struct ILogSystem {
virtual Future<Reference<ILogSystem>> newEpoch(
RecruitFromConfigurationReply const& recr,
Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,

View File

@ -269,7 +269,6 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
// The new epoch is only provisional until the caller updates the coordinated DBCoreState.
Future<Reference<ILogSystem>> newEpoch(RecruitFromConfigurationReply const& recr,
Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,
@ -350,7 +349,6 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
ACTOR static Future<Void> newRemoteEpoch(TagPartitionedLogSystem* self,
Reference<TagPartitionedLogSystem> oldLogSystem,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,
@ -360,7 +358,6 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
ACTOR static Future<Reference<ILogSystem>> newEpoch(Reference<TagPartitionedLogSystem> oldLogSystem,
RecruitFromConfigurationReply recr,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
Version recoveryTransactionVersion,

View File

@ -260,7 +260,6 @@ struct RegisterMasterRequest {
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
UID clusterId;
ReplyPromise<Void> reply;
@ -284,7 +283,6 @@ struct RegisterMasterRequest {
priorCommittedLogServers,
recoveryState,
recoveryStalled,
clusterId,
reply);
}
};
@ -592,7 +590,6 @@ struct InitializeTLogRequest {
Version startVersion;
int logRouterTags;
int txsTags;
UID clusterId;
Version recoveryTransactionVersion;
ReplyPromise<struct TLogInterface> reply;
@ -619,7 +616,6 @@ struct InitializeTLogRequest {
logVersion,
spillType,
txsTags,
clusterId,
recoveryTransactionVersion);
}
};
@ -819,14 +815,12 @@ struct InitializeStorageRequest {
KeyValueStoreType storeType;
Optional<std::pair<UID, Version>>
tssPairIDAndVersion; // Only set if recruiting a tss. Will be the UID and Version of its SS pair.
UID clusterId; // Unique cluster identifier. Only needed at recruitment, will be read from txnStateStore on recovery
Version initialClusterVersion;
ReplyPromise<InitializeStorageReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(
ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId, initialClusterVersion);
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, initialClusterVersion);
}
};
@ -1148,7 +1142,6 @@ class IPageEncryptionKeyProvider;
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,

View File

@ -169,7 +169,6 @@ static const KeyRef persistID = PERSIST_PREFIX "ID"_sr;
static const KeyRef persistTssPairID = PERSIST_PREFIX "tssPairID"_sr;
static const KeyRef persistSSPairID = PERSIST_PREFIX "ssWithTSSPairID"_sr;
static const KeyRef persistTssQuarantine = PERSIST_PREFIX "tssQ"_sr;
static const KeyRef persistClusterIdKey = PERSIST_PREFIX "clusterId"_sr;
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = PERSIST_PREFIX "Version"_sr;
@ -973,7 +972,6 @@ public:
Reference<ILogSystem> logSystem;
Reference<ILogSystem::IPeekCursor> logCursor;
Promise<UID> clusterId;
// The version the cluster starts on. This value is not persisted and may
// not be valid after a recovery.
Version initialClusterVersion = 1;
@ -9354,9 +9352,6 @@ void StorageServerDisk::makeNewStorageServerDurable(const bool shardAware) {
if (data->tssPairID.present()) {
storage->set(KeyValueRef(persistTssPairID, BinaryWriter::toValue(data->tssPairID.get(), Unversioned())));
}
ASSERT(data->clusterId.getFuture().isReady() && data->clusterId.getFuture().get().isValid());
storage->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(data->clusterId.getFuture().get(), Unversioned())));
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())));
if (shardAware) {
@ -9661,54 +9656,9 @@ ACTOR Future<Void> restoreByteSample(StorageServer* data,
return Void();
}
// Reads the cluster ID from the transaction state store.
ACTOR Future<UID> getClusterId(StorageServer* self) {
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
self->cx->invalidateCache(Key(), systemKeys);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
ASSERT(clusterId.present());
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Read the cluster ID from the transaction state store and persist it to local
// storage. This function should only be necessary during an upgrade when the
// prior FDB version did not support cluster IDs. The normal path for storage
// server recruitment will include the cluster ID in the initial recruitment
// message.
ACTOR Future<Void> persistClusterId(StorageServer* self) {
state Transaction tr(self->cx);
loop {
try {
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
if (clusterId.present()) {
auto uid = BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
self->storage.writeKeyValue(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(uid, Unversioned())));
// Purposely not calling commit here, and letting the recurring
// commit handle save this value to disk
self->clusterId.send(uid);
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* storage) {
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> fClusterID = storage->readValue(persistClusterIdKey);
state Future<Optional<Value>> ftssPairID = storage->readValue(persistTssPairID);
state Future<Optional<Value>> fssPairID = storage->readValue(persistSSPairID);
state Future<Optional<Value>> fTssQuarantine = storage->readValue(persistTssQuarantine);
@ -9729,8 +9679,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
restoreByteSample(data, storage, byteSampleSampleRecovered, startByteSampleRestore.getFuture());
TraceEvent("ReadingDurableState", data->thisServerID).log();
wait(waitForAll(std::vector{
fFormat, fID, fClusterID, ftssPairID, fssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(
std::vector{ fFormat, fID, ftssPairID, fssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned,
fShardAvailable,
fChangeFeeds,
@ -9771,14 +9721,6 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
data->bytesRestored += fssPairID.get().expectedSize();
}
if (fClusterID.get().present()) {
data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned()));
data->bytesRestored += fClusterID.get().expectedSize();
} else {
CODE_PROBE(true, "storage server upgraded to version supporting cluster IDs");
data->actors.add(persistClusterId(data));
}
// It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine
// state means the storage engine already had a durability or correctness error, but it should get
// re-quarantined very quickly because of a mismatch if it starts trying to do things again
@ -10979,7 +10921,6 @@ ACTOR Future<Void> storageInterfaceRegistration(StorageServer* self,
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version startVersion,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
@ -10989,7 +10930,6 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
state StorageServer self(persistentData, db, ssi, encryptionKeyProvider);
self.shardAware = SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && persistentData->shardAware();
state Future<Void> ssCore;
self.clusterId.send(clusterId);
self.initialClusterVersion = startVersion;
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
@ -11136,32 +11076,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
wait(delay(0));
ErrorOr<Void> e = wait(errorOr(f));
if (e.isError()) {
Error e = f.getError();
throw e;
// TODO: #5375
/*
if (e.code() != error_code_worker_removed) {
throw e;
}
state UID clusterId = wait(getClusterId(&self));
ASSERT(self.clusterId.isValid());
UID durableClusterId = wait(self.clusterId.getFuture());
ASSERT(durableClusterId.isValid());
if (clusterId == durableClusterId) {
throw worker_removed();
}
// When a storage server connects to a new cluster, it deletes its
// old data and creates a new, empty data file for the new cluster.
// We want to avoid this and force a manual removal of the storage
// servers' old data when being assigned to a new cluster to avoid
// accidental data loss.
TraceEvent(SevWarn, "StorageServerBelongsToExistingCluster")
.detail("ServerID", ssi.id())
.detail("ClusterID", durableClusterId)
.detail("NewClusterID", clusterId);
wait(Future<Void>(Never()));
*/
throw f.getError();
}
self.interfaceRegistered =

View File

@ -2060,7 +2060,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
}
loop choose {
when(state UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
localInfo.myLocality = locality;
@ -2464,7 +2464,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
runningStorages.end(),
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
ASSERT(req.clusterId.isValid());
ASSERT(req.initialClusterVersion >= 0);
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
@ -2531,7 +2530,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Future<Void> s = storageServer(data,
recruited,
req.seedTag,
req.clusterId,
req.initialClusterVersion,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,