Revert "Revert "Refactor: ClusterController driving cluster-recovery state machine" (#6191)

* Revert "Revert "Refactor: ClusterController driving cluster-recovery state machine""

Major changes includes:
1. Re-revert Sequencer refactor commits listed below (in listed order):
1.a. This reverts commit bb17e194d9.
1.b. This reverts commit d174bb2e06.
1.c. This reverts commit 30b05b469c.

2. Update Status.actor to track ClusterController interface to track
   recovery status.
3. Introduce a ServerKnob to define "cluster recovery trace event"
   prefix; for now keeping it as "Master", however, it should allow
   smooth transition to "Cluster" prefix as it seems more appropriate.
This commit is contained in:
Ata E Husain Bohra 2022-01-06 12:15:51 -08:00 committed by GitHub
parent 5c3f6d13af
commit 936bf5336a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 6019 additions and 5370 deletions

View File

@ -55,7 +55,7 @@ Recovery has 9 phases, which are defined as the 9 states in the source code: REA
The recovery process is like a state machine, changing from one state to the next state.
We will describe in the rest of this document what each phase does to drive the recovery to the next state.
Recovery tracks the information of each recovery phase in `MasterRecoveryState` trace event. By checking the message, we can find which phase the recovery is stuck at. The status used in the `MasterRecoveryState` trace event is defined as `RecoveryStatus` structure in `RecoveryState.h`. The status, instead of the name of the 9 phases, is typically used in diagnosing production issues.
Recovery tracks the information of each recovery phase in `<RecoveryEventPrefix>RecoveryState` trace event. By checking the message, we can find which phase the recovery is stuck at. The status used in the `<RecoveryEventPrefix>RecoveryState` trace event is defined as `RecoveryStatus` structure in `RecoveryState.h`. The status, instead of the name of the 9 phases, is typically used in diagnosing production issues.
## Phase 1: READING_CSTATE
@ -139,8 +139,9 @@ However, reading the txnStateStore can be slow because it needs to read from dis
**Recruiting roles step.**
There are cases where the recovery can get stuck at recruiting enough roles for the txn system configuration. For example, if a cluster with replica factor equal to three has only three tLogs and one of them dies during the recovery, the cluster will not succeed in recruiting 3 tLogs and the recovery will get stuck. Another example is when a new database is created and the cluster does not have a valid txnStateStore. To get out of this situation, the master will use an emergency transaction to forcibly change the configuration such that the recruitment can succeed. This configuration change may temporarily violate the contract of the desired configuration, but it is only temporary.
ServerKnob::CLUSTER_RECOVERY_EVENT_NAME_PREFIX defines the prefix for cluster recovery trace events. Hereafter, refered as 'RecoveryEventPrefix' in this document.
We can use the trace event `MasterRecoveredConfig`, which dumps the information of the new transaction systems configuration, to diagnose why the recovery is blocked in this phase.
We can use the trace event `<RecoveryEventPrefix>RecoveredConfig`, which dumps the information of the new transaction systems configuration, to diagnose why the recovery is blocked in this phase.
## Phase 4: RECOVERY_TRANSACTION

View File

@ -493,6 +493,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"name":{
"$enum":[
"unreachable_master_worker",
"unreachable_cluster_controller_worker",
"unreachable_dataDistributor_worker",
"unreachable_ratekeeper_worker",
"unreachable_blobManager_worker",

View File

@ -781,6 +781,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( LATENCY_SAMPLE_SIZE, 100000 );
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
// Cluster recovery
init ( CLUSTER_RECOVERY_EVENT_NAME_PREFIX, "Master");
// Blob granlues
init( BG_URL, "" ); // TODO: store in system key space, eventually
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( randomize && BUGGIFY ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; }

View File

@ -736,6 +736,9 @@ public:
int LATENCY_SAMPLE_SIZE;
double LATENCY_METRICS_LOGGING_INTERVAL;
// Cluster recovery
std::string CLUSTER_RECOVERY_EVENT_NAME_PREFIX;
// blob granule stuff
// FIXME: configure url with database configuration instead of knob eventually
std::string BG_URL;

View File

@ -1086,7 +1086,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf,
TraceEvent("BackupWorkerDone", self.myId).detail("BackupEpoch", self.backupEpoch);
// Notify master so that this worker can be removed from log system, then this
// worker (for an old epoch's unfinished work) can safely exit.
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
wait(brokenPromiseToNever(db->get().clusterInterface.notifyBackupWorkerDone.getReply(
BackupWorkerDoneRequest(self.myId, self.backupEpoch))));
break;
}

View File

@ -8,7 +8,10 @@ set(FDBSERVER_SRCS
BlobManager.actor.cpp
BlobManagerInterface.h
BlobWorker.actor.cpp
ClusterController.actor.h
ClusterController.actor.cpp
ClusterRecovery.actor.h
ClusterRecovery.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h
ConfigDatabaseUnitTests.actor.cpp

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,314 @@
/*
* ClusterRecovery.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H)
#define FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H
#include "fdbserver/ClusterRecovery.actor.g.h"
#elif !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_H)
#define FDBSERVER_CLUSTERRECOVERY_ACTOR_H
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
#include "fdbserver/ClusterController.actor.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Error.h"
#include "flow/SystemMonitor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
typedef enum {
CLUSTER_RECOVERY_STATE_EVENT_NAME,
CLUSTER_RECOVERY_COMMIT_TLOG_EVENT_NAME,
CLUSTER_RECOVERY_DURATION_EVENT_NAME,
CLUSTER_RECOVERY_GENERATION_EVENT_NAME,
CLUSTER_RECOVERY_SS_RECRUITMENT_EVENT_NAME,
CLUSTER_RECOVERY_INVALID_CONFIG_EVENT_NAME,
CLUSTER_RECOVERY_RECOVERING_EVENT_NAME,
CLUSTER_RECOVERY_RECOVERED_EVENT_NAME,
CLUSTER_RECOVERY_SNAPSHOT_CHECK_EVENT_NAME,
CLUSTER_RECOVERY_PAUSE_AGENT_BACKUP_EVENT_NAME,
CLUSTER_RECOVERY_COMMIT_EVENT_NAME,
CLUSTER_RECOVERY_AVAILABLE_EVENT_NAME,
CLUSTER_RECOVERY_METRICS_EVENT_NAME,
CLUSTER_RECOVERY_LAST // Always the last entry
} ClusterRecoveryEventType;
ACTOR Future<Void> recoveryTerminateOnConflict(UID dbgid,
Promise<Void> fullyRecovered,
Future<Void> onConflict,
Future<Void> switchedState);
std::string& getRecoveryEventName(ClusterRecoveryEventType type);
class ReusableCoordinatedState : NonCopyable {
public:
Promise<Void> fullyRecovered;
DBCoreState prevDBState;
DBCoreState myDBState;
bool finalWriteStarted;
Future<Void> previousWrite;
ReusableCoordinatedState(ServerCoordinators const& coordinators,
PromiseStream<Future<Void>> const& addActor,
UID const& dbgid)
: finalWriteStarted(false), previousWrite(Void()), cstate(coordinators), coordinators(coordinators),
addActor(addActor), dbgid(dbgid) {}
Future<Void> read() { return _read(this); }
Future<Void> write(DBCoreState newState, bool finalWrite = false) {
previousWrite = _write(this, newState, finalWrite);
return previousWrite;
}
Future<Void> move(ClusterConnectionString const& nc) { return cstate.move(nc); }
private:
MovableCoordinatedState cstate;
ServerCoordinators coordinators;
PromiseStream<Future<Void>> addActor;
Promise<Void> switchedState;
UID dbgid;
ACTOR Future<Void> _read(ReusableCoordinatedState* self) {
Value prevDBStateRaw = wait(self->cstate.read());
Future<Void> onConflict = recoveryTerminateOnConflict(
self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture());
if (onConflict.isReady() && onConflict.isError()) {
throw onConflict.getError();
}
self->addActor.send(onConflict);
if (prevDBStateRaw.size()) {
self->prevDBState = BinaryReader::fromStringRef<DBCoreState>(prevDBStateRaw, IncludeVersion());
self->myDBState = self->prevDBState;
}
return Void();
}
ACTOR Future<Void> _write(ReusableCoordinatedState* self, DBCoreState newState, bool finalWrite) {
if (self->finalWriteStarted) {
wait(Future<Void>(Never()));
}
if (finalWrite) {
self->finalWriteStarted = true;
}
try {
wait(self->cstate.setExclusive(
BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState()))));
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
}
self->myDBState = newState;
if (!finalWrite) {
self->switchedState.send(Void());
self->cstate = MovableCoordinatedState(self->coordinators);
Value rereadDBStateRaw = wait(self->cstate.read());
DBCoreState readState;
if (rereadDBStateRaw.size())
readState = BinaryReader::fromStringRef<DBCoreState>(rereadDBStateRaw, IncludeVersion());
if (readState != newState) {
TraceEvent("RecoveryTerminated", self->dbgid).detail("Reason", "CStateChanged");
TEST(true); // Coordinated state changed between writing and reading, recovery restarting
throw worker_removed();
}
self->switchedState = Promise<Void>();
self->addActor.send(recoveryTerminateOnConflict(
self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture()));
} else {
self->fullyRecovered.send(Void());
}
return Void();
}
};
struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData> {
ClusterControllerData* controllerData;
UID dbgid;
AsyncTrigger registrationTrigger;
Version lastEpochEnd, // The last version in the old epoch not (to be) rolled back in this recovery
recoveryTransactionVersion; // The first version in this epoch
double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
bool databaseLocked;
Optional<Value> proxyMetadataVersion;
Version minKnownCommittedVersion;
DatabaseConfiguration originalConfiguration;
DatabaseConfiguration configuration;
std::vector<Optional<Key>> primaryDcId;
std::vector<Optional<Key>> remoteDcIds;
bool hasConfiguration;
ServerCoordinators coordinators;
Reference<ILogSystem> logSystem;
Version version; // The last version assigned to a proxy by getVersion()
double lastVersionTime;
LogSystemDiskQueueAdapter* txnStateLogAdapter;
IKeyValueStore* txnStateStore;
int64_t memoryLimit;
std::map<Optional<Value>, int8_t> dcId_locality;
std::vector<Tag> allTags;
int8_t getNextLocality() {
int8_t maxLocality = -1;
for (auto it : dcId_locality) {
maxLocality = std::max(maxLocality, it.second);
}
return maxLocality + 1;
}
std::vector<CommitProxyInterface> commitProxies;
std::vector<CommitProxyInterface> provisionalCommitProxies;
std::vector<GrvProxyInterface> grvProxies;
std::vector<GrvProxyInterface> provisionalGrvProxies;
std::vector<ResolverInterface> resolvers;
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
UID clusterId;
Standalone<StringRef> dbId;
MasterInterface masterInterface;
LifetimeToken masterLifetime;
const ClusterControllerFullInterface
clusterController; // If the cluster controller changes, this master will die, so this is immutable.
ReusableCoordinatedState cstate;
Promise<Void> recoveryReadyForCommits;
Promise<Void> cstateUpdated;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
RecoveryState recoveryState;
AsyncVar<Standalone<VectorRef<ResolverMoveRef>>> resolverChanges;
Version resolverChangesVersion;
std::set<UID> resolverNeedingChanges;
PromiseStream<Future<Void>> addActor;
Reference<AsyncVar<bool>> recruitmentStalled;
bool forceRecovery;
bool neverCreated;
int8_t safeLocality;
int8_t primaryLocality;
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
CounterCollection cc;
Counter changeCoordinatorsRequests;
Counter getCommitVersionRequests;
Counter backupWorkerDoneRequests;
Counter getLiveCommittedVersionRequests;
Counter reportLiveCommittedVersionRequests;
Future<Void> logger;
Reference<EventCacheHolder> recoveredConfigEventHolder;
Reference<EventCacheHolder> clusterRecoveryStateEventHolder;
Reference<EventCacheHolder> clusterRecoveryGenerationsEventHolder;
Reference<EventCacheHolder> clusterRecoveryDurationEventHolder;
Reference<EventCacheHolder> clusterRecoveryAvailableEventHolder;
ClusterRecoveryData(ClusterControllerData* controllerData,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
MasterInterface const& masterInterface,
LifetimeToken const& masterLifetimeToken,
ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController,
Standalone<StringRef> const& dbId,
PromiseStream<Future<Void>> const& addActor,
bool forceRecovery)
: controllerData(controllerData), dbgid(masterInterface.id()), lastEpochEnd(invalidVersion),
recoveryTransactionVersion(invalidVersion), lastCommitTime(0), liveCommittedVersion(invalidVersion),
databaseLocked(false), minKnownCommittedVersion(invalidVersion), hasConfiguration(false),
coordinators(coordinators), version(invalidVersion), lastVersionTime(0), txnStateStore(nullptr),
memoryLimit(2e9), dbId(dbId), masterInterface(masterInterface), masterLifetime(masterLifetimeToken),
clusterController(clusterController), cstate(coordinators, addActor, dbgid), dbInfo(dbInfo),
registrationCount(0), addActor(addActor), recruitmentStalled(makeReference<AsyncVar<bool>>(false)),
forceRecovery(forceRecovery), neverCreated(false), safeLocality(tagLocalityInvalid),
primaryLocality(tagLocalityInvalid), cc("Master", dbgid.toString()),
changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
getCommitVersionRequests("GetCommitVersionRequests", cc),
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
recoveredConfigEventHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
clusterRecoveryStateEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME));
clusterRecoveryGenerationsEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_GENERATION_EVENT_NAME));
clusterRecoveryDurationEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_DURATION_EVENT_NAME));
clusterRecoveryAvailableEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_AVAILABLE_EVENT_NAME));
logger = traceCounters(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME),
dbgid,
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
&cc,
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME));
if (forceRecovery && !controllerData->clusterControllerDcId.present()) {
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
forceRecovery = false;
}
}
~ClusterRecoveryData() {
if (txnStateStore)
txnStateStore->close();
}
};
ACTOR Future<Void> recruitNewMaster(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
MasterInterface* newMaster);
ACTOR Future<Void> cleanupRecoveryActorCollection(Reference<ClusterRecoveryData> self, bool exThrown);
ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self);
bool isNormalClusterRecoveryError(const Error&);
#include "flow/unactorcompiler.h"
#endif

View File

@ -642,6 +642,8 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
//TraceEvent("CPGetVersion", pProxyCommitData->dbgid).detail("Master", pProxyCommitData->master.id().toString()).detail("CommitVersion", self->commitVersion).detail("PrvVersion", self->prevVersion);
for (auto it : versionReply.resolverChanges) {
auto rs = pProxyCommitData->keyResolvers.modify(it.range);
for (auto r = rs.begin(); r != rs.end(); ++r)
@ -878,7 +880,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
if (!self->isMyFirstBatch &&
pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->oldCoordinators.get()) {
wait(brokenPromiseToNever(pProxyCommitData->master.changeCoordinators.getReply(
wait(brokenPromiseToNever(pProxyCommitData->db->get().clusterInterface.changeCoordinators.getReply(
ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get()))));
ASSERT(false); // ChangeCoordinatorsRequest should always throw
}
@ -1093,8 +1095,16 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
applyMetadataEffect(self);
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaEffect");
}
determineCommittedTransactions(self);
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaEffect");
}
if (self->forceRecovery) {
wait(Future<Void>(Never()));
}
@ -1102,9 +1112,18 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// First pass
wait(applyMetadataToCommittedTransactions(self));
if (debugID.present()) {
g_traceBatch.addEvent(
"CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaToCommittedTxn");
}
// Second pass
wait(assignMutationsToStorageServers(self));
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AssignMutationToSS");
}
// Serialize and backup the mutations as a single mutation
if ((pProxyCommitData->vecBackupKeys.size() > 1) && self->logRangeMutations.size()) {
wait(addBackupMutations(pProxyCommitData,
@ -1241,7 +1260,7 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
}
} catch (Error& e) {
if (e.code() == error_code_broken_promise) {
throw master_tlog_failed();
throw tlog_failed();
}
throw;
}
@ -1273,8 +1292,10 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
const Optional<UID>& debugID = self->debugID;
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2)
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
//TraceEvent("CPAdvanceMinVersion", self->pProxyCommitData->dbgid).detail("PrvVersion", self->prevVersion).detail("CommitVersion", self->commitVersion).detail("Master", self->pProxyCommitData->master.id().toString()).detail("TxSize", self->trs.size());
debug_advanceMinCommittedVersion(UID(), self->commitVersion);
}
//TraceEvent("ProxyPushed", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
if (debugID.present())
@ -1999,6 +2020,7 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
Reference<AsyncVar<ServerDBInfo> const> db,
LogEpoch epoch,
Version recoveryTransactionVersion,
@ -2013,8 +2035,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
state Future<Void> lastCommitComplete = Void();
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed());
state Future<Void> onError = transformError(actorCollection(addActor.getFuture()), broken_promise(), tlog_failed());
state double lastCommit = 0;
state GetHealthMetricsReply healthMetricsReply;
@ -2026,7 +2047,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
//TraceEvent("CommitProxyInit1", proxy.id());
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(commitData.db->get().master.id() == master.id() &&
while (!(masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
wait(commitData.db->onChange());
@ -2088,7 +2109,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
loop choose {
when(wait(dbInfoChange)) {
dbInfoChange = commitData.db->onChange();
if (commitData.db->get().master.id() == master.id() &&
if (masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
for (auto it : commitData.tag_popped) {
@ -2106,7 +2127,9 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
const std::vector<CommitTransactionRequest>& trs = batchedRequests.first;
int batchBytes = batchedRequests.second;
//TraceEvent("CommitProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
//TraceEvent("CommitProxyCore", commitData.dbgid).detail("TxSize", trs.size()).detail("MasterLifetime", masterLifetime.toString()).detail("DbMasterLifetime", commitData.db->get().masterLifetime.toString()).detail("RecoveryState", commitData.db->get().recoveryState).detail("CCInf", commitData.db->get().clusterInterface.id().toString());
if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
lastCommit = now();
@ -2155,6 +2178,7 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
try {
state Future<Void> core = commitProxyServerCore(proxy,
req.master,
req.masterLifetime,
db,
req.recoveryCount,
req.recoveryTransactionVersion,
@ -2165,7 +2189,7 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
TraceEvent("CommitProxyTerminated", proxy.id()).error(e, true);
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out &&
e.code() != error_code_failed_to_progress) {
throw;

View File

@ -920,12 +920,12 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
Reference<AsyncVar<ServerDBInfo> const> db) {
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed());
state Future<Void> onError = transformError(actorCollection(addActor.getFuture()), broken_promise(), tlog_failed());
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
@ -933,9 +933,14 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
addActor.send(waitFailureServer(proxy.waitFailure.getFuture()));
addActor.send(traceRole(Role::GRV_PROXY, proxy.id()));
TraceEvent("GrvProxyServerCore", proxy.id())
.detail("MasterId", master.id())
.detail("MasterLifetime", masterLifetime.toString())
.detail("RecoveryCount", db->get().recoveryCount);
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(grvProxyData.db->get().master.id() == master.id() &&
grvProxyData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
while (!(masterLifetime.isEqual(grvProxyData.db->get().masterLifetime) &&
grvProxyData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS)) {
wait(grvProxyData.db->onChange());
}
// Do we need to wait for any db info change? Yes. To update latency band.
@ -956,7 +961,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
when(wait(dbInfoChange)) {
dbInfoChange = grvProxyData.db->onChange();
if (grvProxyData.db->get().master.id() == master.id() &&
if (masterLifetime.isEqual(grvProxyData.db->get().masterLifetime) &&
grvProxyData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
grvProxyData.logSystem =
ILogSystem::fromServerDBInfo(proxy.id(), grvProxyData.db->get(), false, addActor);
@ -983,13 +988,13 @@ ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
state Future<Void> core = grvProxyServerCore(proxy, req.master, req.masterLifetime, db);
wait(core || checkRemoved(db, req.recoveryCount, proxy));
} catch (Error& e) {
TraceEvent("GrvProxyTerminated", proxy.id()).error(e, true);
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out) {
throw;
}

View File

@ -22,11 +22,13 @@
#define FDBSERVER_MASTERINTERFACE_H
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/TLogInterface.h"
#include "fdbclient/Notified.h"
typedef uint64_t DBRecoveryCount;
@ -34,20 +36,17 @@ struct MasterInterface {
constexpr static FileIdentifier file_identifier = 5979145;
LocalityData locality;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct TLogRejoinRequest>
tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
RequestStream<struct ChangeCoordinatorsRequest> changeCoordinators;
RequestStream<struct GetCommitVersionRequest> getCommitVersion;
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
// Get the centralized live committed version reported by commit proxies.
RequestStream<struct GetRawCommittedVersionRequest> getLiveCommittedVersion;
// Report a proxy's committed version.
RequestStream<struct ReportRawCommittedVersionRequest> reportLiveCommittedVersion;
RequestStream<struct UpdateRecoveryDataRequest> updateRecoveryData;
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return changeCoordinators.getEndpoint().addresses; }
NetworkAddress address() const { return getCommitVersion.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return getCommitVersion.getEndpoint().addresses; }
UID id() const { return changeCoordinators.getEndpoint().token; }
UID id() const { return getCommitVersion.getEndpoint().token; }
template <class Archive>
void serialize(Archive& ar) {
if constexpr (!is_fb_function<Archive>) {
@ -55,61 +54,28 @@ struct MasterInterface {
}
serializer(ar, locality, waitFailure);
if (Archive::isDeserializing) {
tlogRejoin = RequestStream<struct TLogRejoinRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
getCommitVersion =
RequestStream<struct GetCommitVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
notifyBackupWorkerDone =
RequestStream<struct BackupWorkerDoneRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
RequestStream<struct GetCommitVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
getLiveCommittedVersion =
RequestStream<struct GetRawCommittedVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(5));
RequestStream<struct GetRawCommittedVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
reportLiveCommittedVersion = RequestStream<struct ReportRawCommittedVersionRequest>(
waitFailure.getEndpoint().getAdjustedEndpoint(6));
waitFailure.getEndpoint().getAdjustedEndpoint(3));
updateRecoveryData =
RequestStream<struct UpdateRecoveryDataRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
}
}
void initEndpoints() {
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(tlogRejoin.getReceiver(TaskPriority::MasterTLogRejoin));
streams.push_back(changeCoordinators.getReceiver());
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
streams.push_back(notifyBackupWorkerDone.getReceiver());
streams.push_back(getLiveCommittedVersion.getReceiver(TaskPriority::GetLiveCommittedVersion));
streams.push_back(reportLiveCommittedVersion.getReceiver(TaskPriority::ReportLiveCommittedVersion));
streams.push_back(updateRecoveryData.getReceiver(TaskPriority::UpdateRecoveryTransactionVersion));
FlowTransport::transport().addEndpoints(streams);
}
};
struct TLogRejoinReply {
constexpr static FileIdentifier file_identifier = 11;
// false means someone else registered, so we should re-register. true means this master is recovered, so don't
// send again to the same master.
bool masterIsRecovered;
TLogRejoinReply() = default;
explicit TLogRejoinReply(bool masterIsRecovered) : masterIsRecovered(masterIsRecovered) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, masterIsRecovered);
}
};
struct TLogRejoinRequest {
constexpr static FileIdentifier file_identifier = 15692200;
TLogInterface myInterface;
ReplyPromise<TLogRejoinReply> reply;
TLogRejoinRequest() {}
explicit TLogRejoinRequest(const TLogInterface& interf) : myInterface(interf) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, myInterface, reply);
}
};
struct ChangeCoordinatorsRequest {
constexpr static FileIdentifier file_identifier = 13605416;
Standalone<StringRef> newConnectionString;
@ -184,6 +150,26 @@ struct GetCommitVersionRequest {
}
};
struct UpdateRecoveryDataRequest {
constexpr static FileIdentifier file_identifier = 13605417;
Version recoveryTransactionVersion;
Version lastEpochEnd;
std::vector<CommitProxyInterface> commitProxies;
ReplyPromise<Void> reply;
UpdateRecoveryDataRequest() {}
UpdateRecoveryDataRequest(Version recoveryTransactionVersion,
Version lastEpochEnd,
std::vector<CommitProxyInterface> commitProxies)
: recoveryTransactionVersion(recoveryTransactionVersion), lastEpochEnd(lastEpochEnd),
commitProxies(commitProxies) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, reply);
}
};
struct ReportRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 1853148;
Version version;
@ -207,21 +193,6 @@ struct ReportRawCommittedVersionRequest {
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;
LogEpoch backupEpoch;
ReplyPromise<Void> reply;
BackupWorkerDoneRequest() : workerUID(), backupEpoch(-1) {}
BackupWorkerDoneRequest(UID id, LogEpoch epoch) : workerUID(id), backupEpoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, workerUID, backupEpoch, reply);
}
};
struct LifetimeToken {
UID ccID;
int64_t count;
@ -231,6 +202,9 @@ struct LifetimeToken {
bool isStillValid(LifetimeToken const& latestToken, bool isLatestID) const {
return ccID == latestToken.ccID && (count >= latestToken.count || isLatestID);
}
bool isEqual(LifetimeToken const& toCompare) {
return ccID.compare(toCompare.ccID) == 0 && count == toCompare.count;
}
std::string toString() const { return ccID.shortString() + format("#%lld", count); }
void operator++() { ++count; }
@ -240,4 +214,18 @@ struct LifetimeToken {
}
};
struct CommitProxyVersionReplies {
std::map<uint64_t, GetCommitVersionReply> replies;
NotifiedVersion latestRequestNum;
CommitProxyVersionReplies(CommitProxyVersionReplies&& r) noexcept
: replies(std::move(r.replies)), latestRequestNum(std::move(r.latestRequestNum)) {}
void operator=(CommitProxyVersionReplies&& r) noexcept {
replies = std::move(r.replies);
latestRequestNum = std::move(r.latestRequestNum);
}
CommitProxyVersionReplies() : latestRequestNum(0) {}
};
#endif

View File

@ -1249,11 +1249,11 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
}
}
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster) {
state UID lastMasterID(0, 0);
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC) {
state LifetimeToken lastMasterLifetime;
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -1274,18 +1274,21 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
throw worker_removed();
}
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req;
req.myInterface = tli;
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterID = self->dbInfo->get().master.id();
lastMasterLifetime = self->dbInfo->get().masterLifetime;
}
when(wait(self->dbInfo->onChange())) {}
}
@ -1293,7 +1296,7 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithMaster || self->dbInfo->onChange());
wait(registerWithCC || self->dbInfo->onChange());
}
}
}
@ -1479,7 +1482,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
ASSERT(fVers.get().size() == fRecoverCounts.get().size());
state int idx = 0;
state Promise<Void> registerWithMaster;
state Promise<Void> registerWithCC;
for (idx = 0; idx < fVers.get().size(); idx++) {
state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin);
UID id1 = BinaryReader::fromStringRef<UID>(rawId, Unversioned());
@ -1513,7 +1516,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
logData->version.set(ver);
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed = rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture());
logData->removed = rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture());
removed.push_back(errorOr(logData->removed));
TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver);
@ -1617,8 +1620,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
self->sharedActors.send(tLogCore(self, it.second));
}
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
return Void();
}

View File

@ -1729,12 +1729,12 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -1762,17 +1762,20 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
throw worker_removed();
}
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterID = self->dbInfo->get().master.id();
lastMasterLifetime = self->dbInfo->get().masterLifetime;
}
when(wait(self->dbInfo->onChange())) {}
}
@ -1780,7 +1783,7 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithMaster || self->dbInfo->onChange());
wait(registerWithCC || self->dbInfo->onChange());
}
}
}
@ -2384,7 +2387,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithMaster;
state Promise<Void> registerWithCC;
state std::map<UID, TLogInterface> id_interf;
for (idx = 0; idx < fVers.get().size(); idx++) {
state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin);
@ -2432,7 +2435,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
removed.push_back(errorOr(logData->removed));
TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver);
@ -2534,8 +2537,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
return Void();
}
@ -2653,7 +2656,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->queueOrder.push_back(recruited.id());
TraceEvent("TLogStart", logData->logId).log();

View File

@ -2174,12 +2174,12 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -2207,17 +2207,20 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
throw worker_removed();
}
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterID = self->dbInfo->get().master.id();
lastMasterLifetime = self->dbInfo->get().masterLifetime;
}
when(wait(self->dbInfo->onChange())) {}
}
@ -2225,7 +2228,7 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithMaster || self->dbInfo->onChange());
wait(registerWithCC || self->dbInfo->onChange());
}
}
}
@ -2846,7 +2849,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithMaster;
state Promise<Void> registerWithCC;
state std::map<UID, TLogInterface> id_interf;
state std::vector<std::pair<Version, UID>> logsByVersion;
for (idx = 0; idx < fVers.get().size(); idx++) {
@ -2894,7 +2897,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
@ -3017,8 +3020,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
return Void();
}
@ -3135,7 +3138,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());

View File

@ -22,12 +22,14 @@
#include "contrib/fmt-8.0.1/include/fmt/format.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbserver/Status.h"
#include "flow/ITrace.h"
#include "flow/Trace.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/WorkerInterface.actor.h"
#include <time.h>
#include "fdbserver/ClusterRecovery.actor.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/DataDistribution.actor.h"
#include "flow/UnitTest.h"
@ -1149,6 +1151,7 @@ static JsonBuilderObject clientStatusFetcher(
}
ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(Database cx,
WorkerDetails ccWorker,
WorkerDetails mWorker,
int workerCount,
std::set<std::string>* incomplete_reasons,
@ -1156,13 +1159,18 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(Database cx,
state JsonBuilderObject message;
state Transaction tr(cx);
try {
state Future<TraceEventFields> mdActiveGensF = timeoutError(
mWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MasterRecoveryGenerations"))),
1.0);
state Future<TraceEventFields> mdF = timeoutError(
mWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MasterRecoveryState"))), 1.0);
state Future<TraceEventFields> mDBAvailableF = timeoutError(
mWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MasterRecoveryAvailable"))), 1.0);
state Future<TraceEventFields> mdActiveGensF =
timeoutError(ccWorker.interf.eventLogRequest.getReply(EventLogRequest(StringRef(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_GENERATION_EVENT_NAME)))),
1.0);
state Future<TraceEventFields> mdF =
timeoutError(ccWorker.interf.eventLogRequest.getReply(EventLogRequest(StringRef(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME)))),
1.0);
state Future<TraceEventFields> mDBAvailableF =
timeoutError(ccWorker.interf.eventLogRequest.getReply(EventLogRequest(StringRef(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_AVAILABLE_EVENT_NAME)))),
1.0);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<ErrorOr<Version>> rvF = errorOr(timeoutError(tr.getReadVersion(), 1.0));
@ -2709,6 +2717,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state JsonBuilderArray messages;
state std::set<std::string> status_incomplete_reasons;
state WorkerDetails mWorker; // Master worker
state WorkerDetails ccWorker; // Cluster-Controller worker
state WorkerDetails ddWorker; // DataDistributor worker
state WorkerDetails rkWorker; // Ratekeeper worker
@ -2722,6 +2731,15 @@ ACTOR Future<StatusReply> clusterGetStatus(
JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker."));
}
// Get the cluster-controller Worker interface
Optional<WorkerDetails> _ccWorker = getWorker(workers, db->get().clusterInterface.address());
if (_ccWorker.present()) {
ccWorker = _ccWorker.get();
} else {
messages.push_back(JsonString::makeMessage("unreachable_cluster_controller_worker",
"Unable to locate the cluster-controller worker."));
}
// Get the DataDistributor worker interface
Optional<WorkerDetails> _ddWorker;
if (db->get().distributor.present()) {
@ -2789,8 +2807,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
// construct status information for cluster subsections
state int statusCode = (int)RecoveryStatus::END;
state JsonBuilderObject recoveryStateStatus =
wait(recoveryStateStatusFetcher(cx, mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
state JsonBuilderObject recoveryStateStatus = wait(
recoveryStateStatusFetcher(cx, ccWorker, mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
// machine metrics
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();

View File

@ -319,9 +319,9 @@ struct TLogData : NonCopyable {
// data is written from. This value is restored from disk when the tlog
// restarts.
UID durableClusterId;
// The master cluster ID stores the cluster ID read from the txnStateStore.
// The cluster-controller cluster ID stores the cluster ID read from the txnStateStore.
// It is cached in this variable.
UID masterClusterId;
UID ccClusterId;
UID dbgid;
UID workerID;
@ -783,7 +783,7 @@ void TLogQueue::updateVersionSizes(const TLogQueueEntry& result,
ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply, Reference<LogData> logData) {
state Version stopVersion = logData->version.get();
TEST(true); // TLog stopped by recovering master
TEST(true); // TLog stopped by recovering cluster-controller
TEST(logData->stopped); // logData already stopped
TEST(!logData->stopped); // logData not yet stopped
@ -2251,12 +2251,12 @@ ACTOR Future<UID> getClusterId(TLogData* self) {
}
}
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -2284,24 +2284,27 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
// with a different cluster ID.
state UID clusterId = wait(getClusterId(self));
ASSERT(clusterId.isValid());
self->masterClusterId = clusterId;
self->ccClusterId = clusterId;
ev.detail("ClusterId", clusterId).detail("SelfClusterId", self->durableClusterId);
if (BUGGIFY)
wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01()));
throw worker_removed();
}
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
choose {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterID = self->dbInfo->get().master.id();
lastMasterLifetime = self->dbInfo->get().masterLifetime;
}
when(wait(self->dbInfo->onChange())) {}
}
@ -2309,7 +2312,7 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithMaster || self->dbInfo->onChange());
wait(registerWithCC || self->dbInfo->onChange());
}
}
}
@ -2506,13 +2509,13 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
}
// Persist cluster ID once cluster has recovered.
auto masterClusterId = self->dbInfo->get().clusterId;
auto ccClusterId = self->dbInfo->get().clusterId;
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
!self->durableClusterId.isValid()) {
ASSERT(masterClusterId.isValid());
self->durableClusterId = masterClusterId;
ASSERT(ccClusterId.isValid());
self->durableClusterId = ccClusterId;
self->persistentData->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(masterClusterId, Unversioned())));
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(ccClusterId, Unversioned())));
wait(self->persistentData->commit());
}
}
@ -2958,7 +2961,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithMaster;
state Promise<Void> registerWithCC;
state std::map<UID, TLogInterface> id_interf;
state std::vector<std::pair<Version, UID>> logsByVersion;
for (idx = 0; idx < fVers.get().size(); idx++) {
@ -3014,7 +3017,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
@ -3137,8 +3140,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
return Void();
}
@ -3263,7 +3266,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());
@ -3491,13 +3494,13 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
// it should automatically exclude itself to avoid being used in
// the new cluster.
auto recoveryState = self.dbInfo->get().recoveryState;
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.masterClusterId.isValid() &&
self.durableClusterId.isValid() && self.masterClusterId != self.durableClusterId) {
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.ccClusterId.isValid() &&
self.durableClusterId.isValid() && self.ccClusterId != self.durableClusterId) {
state NetworkAddress address = g_network->getLocalAddress();
wait(excludeServers(self.cx, { AddressExclusion{ address.ip, address.port } }));
TraceEvent(SevWarnAlways, "TLogBelongsToExistingCluster")
.detail("ClusterId", self.durableClusterId)
.detail("NewClusterId", self.masterClusterId);
.detail("NewClusterId", self.ccClusterId);
}
// If the tlog has a valid durable cluster ID, we don't want it to
// wipe its data! Throw this error to signal to `tlogTerminated` to

View File

@ -461,8 +461,8 @@ ACTOR Future<Void> TagPartitionedLogSystem::onError_internal(TagPartitionedLogSy
changes.push_back(self->backupWorkerChanged.onTrigger());
ASSERT(failed.size() >= 1);
wait(quorum(changes, 1) || tagError<Void>(quorum(failed, 1), master_tlog_failed()) ||
tagError<Void>(quorum(backupFailed, 1), master_backup_worker_failed()));
wait(quorum(changes, 1) || tagError<Void>(quorum(failed, 1), tlog_failed()) ||
tagError<Void>(quorum(backupFailed, 1), backup_worker_failed()));
}
}
@ -2300,7 +2300,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed());
cluster_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
nextRouter = (nextRouter + 1) % workers.size();
@ -2349,7 +2349,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed());
cluster_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
nextRouter = (nextRouter + 1) % workers.size();
@ -2410,7 +2410,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
if (!forRemote) {
self->logSystemConfigChanged.trigger();
wait(failed.size() ? tagError<Void>(quorum(failed, 1), master_tlog_failed()) : Future<Void>(Never()));
wait(failed.size() ? tagError<Void>(quorum(failed, 1), tlog_failed()) : Future<Void>(Never()));
throw internal_error();
}
return Void();
@ -2509,7 +2509,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
throwErrorOr(
remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
}
std::vector<Tag> localTags = TagPartitionedLogSystem::getLocalTags(remoteLocality, allTags);
@ -2587,7 +2587,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
remoteTLogInitializationReplies.push_back(transformErrors(
throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor(
remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs")
.detail("StartVersion", logSet->startVersion)
@ -2616,7 +2616,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
self->remoteRecoveryComplete = waitForAll(recoveryComplete);
self->tLogs.push_back(logSet);
@ -2857,7 +2857,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
initializationReplies.push_back(transformErrors(
throwErrorOr(recr.tLogs[i].tLog.getReplyUnlessFailedFor(
reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
state std::vector<Future<Void>> recoveryComplete;
@ -2924,7 +2924,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
satelliteInitializationReplies.push_back(transformErrors(
throwErrorOr(recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor(
sreqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
wait(waitForAll(satelliteInitializationReplies) || oldRouterRecruitment);
@ -2940,7 +2940,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
}
wait(waitForAll(initializationReplies) || oldRouterRecruitment);
@ -2955,7 +2955,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
// Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running
// recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation)
throw master_recovery_failed();
throw cluster_recovery_failed();
for (int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++)
recoveryComplete.push_back(transformErrors(
@ -2963,7 +2963,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
master_recovery_failed()));
cluster_recovery_failed()));
logSystem->recoveryComplete = waitForAll(recoveryComplete);
if (configuration.usableRegions > 1) {
@ -3057,7 +3057,7 @@ ACTOR Future<TLogLockResult> TagPartitionedLogSystem::lockTLog(
UID myID,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> tlog) {
TraceEvent("TLogLockStarted", myID).detail("TLog", tlog->get().id());
TraceEvent("TLogLockStarted", myID).detail("TLog", tlog->get().id()).detail("InfPresent", tlog->get().present());
loop {
choose {
when(TLogLockResult data = wait(

View File

@ -163,17 +163,25 @@ struct ClusterControllerFullInterface {
RequestStream<struct GetServerDBInfoRequest>
getServerDBInfo; // only used by testers; the cluster controller will send the serverDBInfo to workers
RequestStream<struct UpdateWorkerHealthRequest> updateWorkerHealth;
RequestStream<struct TLogRejoinRequest>
tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new controller
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
RequestStream<struct ChangeCoordinatorsRequest> changeCoordinators;
UID id() const { return clientInterface.id(); }
bool operator==(ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator!=(ClusterControllerFullInterface const& r) const { return id() != r.id(); }
NetworkAddress address() const { return clientInterface.address(); }
bool hasMessage() const {
return clientInterface.hasMessage() || recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() || recruitStorage.getFuture().isReady() ||
recruitBlobWorker.getFuture().isReady() || registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() || registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady();
getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() ||
tlogRejoin.getFuture().isReady() || notifyBackupWorkerDone.getFuture().isReady() ||
changeCoordinators.getFuture().isReady();
}
void initEndpoints() {
@ -187,6 +195,9 @@ struct ClusterControllerFullInterface {
registerMaster.getEndpoint(TaskPriority::ClusterControllerRegister);
getServerDBInfo.getEndpoint(TaskPriority::ClusterController);
updateWorkerHealth.getEndpoint(TaskPriority::ClusterController);
tlogRejoin.getEndpoint(TaskPriority::MasterTLogRejoin);
notifyBackupWorkerDone.getEndpoint(TaskPriority::ClusterController);
changeCoordinators.getEndpoint(TaskPriority::DefaultEndpoint);
}
template <class Ar>
@ -204,7 +215,10 @@ struct ClusterControllerFullInterface {
getWorkers,
registerMaster,
getServerDBInfo,
updateWorkerHealth);
updateWorkerHealth,
tlogRejoin,
notifyBackupWorkerDone,
changeCoordinators);
}
};
@ -323,10 +337,11 @@ struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
Optional<UID> dbgId;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, remoteTLogs, logRouters);
serializer(ar, remoteTLogs, logRouters, dbgId);
}
};
@ -336,6 +351,7 @@ struct RecruitRemoteFromConfigurationRequest {
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
Optional<UID> dbgId;
ReplyPromise<RecruitRemoteFromConfigurationReply> reply;
RecruitRemoteFromConfigurationRequest() {}
@ -348,7 +364,7 @@ struct RecruitRemoteFromConfigurationRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, dbgId, reply);
}
};
@ -486,6 +502,49 @@ struct UpdateWorkerHealthRequest {
}
};
struct TLogRejoinReply {
constexpr static FileIdentifier file_identifier = 11;
// false means someone else registered, so we should re-register. true means this master is recovered, so don't
// send again to the same master.
bool masterIsRecovered;
TLogRejoinReply() = default;
explicit TLogRejoinReply(bool masterIsRecovered) : masterIsRecovered(masterIsRecovered) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, masterIsRecovered);
}
};
struct TLogRejoinRequest {
constexpr static FileIdentifier file_identifier = 15692200;
TLogInterface myInterface;
ReplyPromise<TLogRejoinReply> reply;
TLogRejoinRequest() {}
explicit TLogRejoinRequest(const TLogInterface& interf) : myInterface(interf) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, myInterface, reply);
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;
LogEpoch backupEpoch;
ReplyPromise<Void> reply;
BackupWorkerDoneRequest() : workerUID(), backupEpoch(-1) {}
BackupWorkerDoneRequest(UID id, LogEpoch epoch) : workerUID(id), backupEpoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, workerUID, backupEpoch, reply);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;
@ -605,6 +664,7 @@ struct RecruitMasterRequest {
struct InitializeCommitProxyRequest {
constexpr static FileIdentifier file_identifier = 10344153;
MasterInterface master;
LifetimeToken masterLifetime;
uint64_t recoveryCount;
Version recoveryTransactionVersion;
bool firstProxy;
@ -612,19 +672,20 @@ struct InitializeCommitProxyRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, master, recoveryCount, recoveryTransactionVersion, firstProxy, reply);
serializer(ar, master, masterLifetime, recoveryCount, recoveryTransactionVersion, firstProxy, reply);
}
};
struct InitializeGrvProxyRequest {
constexpr static FileIdentifier file_identifier = 8265613;
MasterInterface master;
LifetimeToken masterLifetime;
uint64_t recoveryCount;
ReplyPromise<GrvProxyInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, master, recoveryCount, reply);
serializer(ar, master, masterLifetime, recoveryCount, reply);
}
};

File diff suppressed because it is too large Load Diff

View File

@ -700,8 +700,8 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
// Manually set up a master address.
NetworkAddress testAddress(IPAddress(0x13131313), 1);
testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ testAddress }, UID(1, 2)));
testDbInfo.master.getCommitVersion =
RequestStream<struct GetCommitVersionRequest>(Endpoint({ testAddress }, UID(1, 2)));
// First, create an empty TLogInterface, and check that it shouldn't be considered as in primary DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
@ -1772,12 +1772,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
startRole(Role::MASTER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.tlogRejoin);
DUMPTOKEN(recruited.changeCoordinators);
DUMPTOKEN(recruited.getCommitVersion);
DUMPTOKEN(recruited.getLiveCommittedVersion);
DUMPTOKEN(recruited.reportLiveCommittedVersion);
DUMPTOKEN(recruited.notifyBackupWorkerDone);
DUMPTOKEN(recruited.updateRecoveryData);
// printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer(

View File

@ -94,21 +94,22 @@ ERROR( never_reply, 1104, "Never reply to the request" )
ERROR( recruitment_failed, 1200, "Recruitment of a server failed" ) // Be careful, catching this will delete the data of a storage server or tlog permanently
ERROR( move_to_removed_server, 1201, "Attempt to move keys to a storage server that was removed" )
ERROR( worker_removed, 1202, "Normal worker shut down" ) // Be careful, catching this will delete the data of a storage server or tlog permanently
ERROR( master_recovery_failed, 1203, "Master recovery failed")
ERROR( cluster_recovery_failed, 1203, "Cluster recovery failed")
ERROR( master_max_versions_in_flight, 1204, "Master hit maximum number of versions in flight" )
ERROR( master_tlog_failed, 1205, "Master terminating because a TLog failed" ) // similar to tlog_stopped, but the tlog has actually died
ERROR( tlog_failed, 1205, "Cluster recovery terminating because a TLog failed" ) // similar to tlog_stopped, but the tlog has actually died
ERROR( worker_recovery_failed, 1206, "Recovery of a worker process failed" )
ERROR( please_reboot, 1207, "Reboot of server process requested" )
ERROR( please_reboot_delete, 1208, "Reboot of server process requested, with deletion of state" )
ERROR( commit_proxy_failed, 1209, "Master terminating because a CommitProxy failed" )
ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver failed" )
ERROR( resolver_failed, 1210, "Cluster recovery terminating because a Resolver failed" )
ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" )
ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed")
ERROR( backup_worker_failed, 1212, "Cluster recovery terminating because a backup worker failed")
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
ERROR( grv_proxy_failed, 1214, "Master terminating because a GRVProxy failed" )
ERROR( grv_proxy_failed, 1214, "Cluster recovery terminating because a GRVProxy failed" )
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" )
ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" )
ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -84,6 +84,7 @@ enum class TaskPriority {
GetConsistentReadVersion = 8500,
GetLiveCommittedVersionReply = 8490,
GetLiveCommittedVersion = 8480,
UpdateRecoveryTransactionVersion = 8470,
DefaultPromiseEndpoint = 8000,
DefaultOnMainThread = 7500,
DefaultDelay = 7010,