diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index d248a2e316..004fc36567 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3355,11 +3355,14 @@ ACTOR Future snapshotDatabase(Reference cx, StringRef sna g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.snapshotDatabase.Before"); } - ProxySnapRequest req(snapPayload, snapUID, debugID); - wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, req, cx->taskID, true /*atmostOnce*/ )); - if (debugID.present()) - g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), - "NativeAPI.SnapshotDatabase.After"); + choose { + when(wait(cx->onMasterProxiesChanged())) { throw operation_failed(); } + when(wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, ProxySnapRequest(snapPayload, snapUID, debugID), cx->taskID, true /*atmostOnce*/ ))) { + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), + "NativeAPI.SnapshotDatabase.After"); + } + } } catch (Error& e) { TraceEvent("NativeAPI.SnapshotDatabaseError") .detail("SnapPayload", snapPayload) @@ -3370,11 +3373,11 @@ ACTOR Future snapshotDatabase(Reference cx, StringRef sna return Void(); } -ACTOR Future snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) { +ACTOR Future snapCreate(Database cx, StringRef snapCmd, UID snapUID) { // remember the client ID before the snap operation state UID preSnapClientUID = cx->clientInfo->get().id; - TraceEvent("SnapCreateCoreEnter") + TraceEvent("SnapCreateEnter") .detail("SnapCmd", snapCmd.toString()) .detail("UID", snapUID) .detail("PreSnapClientUID", preSnapClientUID); @@ -3392,7 +3395,7 @@ ACTOR Future snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) { Future exec = snapshotDatabase(Reference::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID); wait(exec); } catch (Error& e) { - TraceEvent("SnapCreateCoreError") + TraceEvent("SnapCreateError") .detail("SnapCmd", snapCmd.toString()) .detail("UID", snapUID) .error(e); @@ -3402,28 +3405,15 @@ ACTOR Future snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) { UID postSnapClientUID = cx->clientInfo->get().id; if (preSnapClientUID != postSnapClientUID) { // if the client IDs changed then we fail the snapshot - TraceEvent("SnapCreateCoreUIDMismatch") + TraceEvent("SnapCreateUIDMismatch") .detail("SnapPreSnapClientUID", preSnapClientUID) .detail("SnapPostSnapClientUID", postSnapClientUID); throw coordinators_changed(); } - TraceEvent("SnapCreateCoreExit") + TraceEvent("SnapCreateExit") .detail("SnapCmd", snapCmd.toString()) .detail("UID", snapUID) .detail("PreSnapClientUID", preSnapClientUID); return Void(); } - -ACTOR Future snapCreate(Database cx, StringRef snapCmd, UID snapUID) { - state int oldMode = wait( setDDMode( cx, 0 ) ); - try { - wait(snapCreateCore(cx, snapCmd, snapUID)); - } catch (Error& e) { - state Error err = e; - wait(success( setDDMode( cx, oldMode ) )); - throw err; - } - wait(success( setDDMode( cx, oldMode ) )); - return Void(); -} diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e7f83431c1..48e1196b0f 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -393,9 +393,11 @@ ACTOR Future> getInitialDataDistribution( Dat BinaryReader rd( mode.get(), Unversioned() ); rd >> result->mode; } - if (!result->mode) // result->mode can be changed to 0 when we disable data distribution + if (!result->mode || !isDDEnabled()) { + // DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0) + TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD"); return result; - + } state Future> workers = getWorkers(&tr); state Future> serverList = tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ); @@ -3071,6 +3073,7 @@ ACTOR Future storageServerFailureTracker( Version addedVersion ) { state StorageServerInterface interf = server->lastKnownInterface; + state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2; loop { state bool inHealthyZone = self->healthyZone.get().present() && interf.locality.zoneId() == self->healthyZone.get(); if(inHealthyZone) { @@ -3103,7 +3106,7 @@ ACTOR Future storageServerFailureTracker( choose { when ( wait(healthChanged) ) { status->isFailed = !status->isFailed; - if(!status->isFailed && (server->teams.size() < SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER || self->lastBuildTeamsFailed)) { + if(!status->isFailed && (server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) { self->doBuildTeams = true; } if(status->isFailed && self->healthyZone.get().present() && self->clearHealthyZoneFuture.isReady()) { @@ -3140,6 +3143,7 @@ ACTOR Future storageServerTracker( state Future storeTracker = keyValueStoreTypeTracker( self, server ); state bool hasWrongStoreTypeOrDC = false; + state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2; try { loop { @@ -3225,7 +3229,7 @@ ACTOR Future storageServerTracker( self->restartRecruiting.trigger(); if (lastIsUnhealthy && !status.isUnhealthy() && - ( server->teams.size() < SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER || self->lastBuildTeamsFailed)) { + ( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) { self->doBuildTeams = true; self->restartTeamBuilder.trigger(); // This does not trigger building teams if there exist healthy teams } @@ -3689,12 +3693,21 @@ ACTOR Future waitForDataDistributionEnabled( Database cx ) { try { Optional mode = wait( tr.get( dataDistributionModeKey ) ); - if (!mode.present()) return Void(); + if (!mode.present() && isDDEnabled()) { + TraceEvent("WaitForDDEnabledSucceeded"); + return Void(); + } if (mode.present()) { BinaryReader rd( mode.get(), Unversioned() ); int m; rd >> m; - if (m) return Void(); + TraceEvent(SevDebug, "WaitForDDEnabled") + .detail("Mode", m) + .detail("IsDDEnabled()", isDDEnabled()); + if (m && isDDEnabled()) { + TraceEvent("WaitForDDEnabledSucceeded"); + return Void(); + } } tr.reset(); @@ -3709,18 +3722,32 @@ ACTOR Future isDataDistributionEnabled( Database cx ) { loop { try { Optional mode = wait( tr.get( dataDistributionModeKey ) ); - if (!mode.present()) return true; + if (!mode.present() && isDDEnabled()) return true; if (mode.present()) { BinaryReader rd( mode.get(), Unversioned() ); int m; rd >> m; - if (m) return true; + if (m && isDDEnabled()) { + TraceEvent(SevDebug, "IsDDEnabledSucceeded") + .detail("Mode", m) + .detail("IsDDEnabled()", isDDEnabled()); + return true; + } } // SOMEDAY: Write a wrapper in MoveKeys.actor.h Optional readVal = wait( tr.get( moveKeysLockOwnerKey ) ); UID currentOwner = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); - if( currentOwner != dataDistributionModeLock ) + if( isDDEnabled() && (currentOwner != dataDistributionModeLock ) ) { + TraceEvent(SevDebug, "IsDDEnabledSucceeded") + .detail("CurrentOwner", currentOwner) + .detail("DDModeLock", dataDistributionModeLock) + .detail("IsDDEnabled", isDDEnabled()); return true; + } + TraceEvent(SevDebug, "IsDDEnabledFailed") + .detail("CurrentOwner", currentOwner) + .detail("DDModeLock", dataDistributionModeLock) + .detail("IsDDEnabled", isDDEnabled()); return false; } catch (Error& e) { wait( tr.onError(e) ); @@ -3889,7 +3916,10 @@ ACTOR Future dataDistribution(Reference self) TraceEvent("DDInitGotInitialDD", self->ddId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD"); } - if (initData->mode) break; // mode may be set true by system operator using fdbcli + if (initData->mode && isDDEnabled()) { + // mode may be set true by system operator using fdbcli and isDDEnabled() set to true + break; + } TraceEvent("DataDistributionDisabled", self->ddId); TraceEvent("MovingData", self->ddId) @@ -3991,7 +4021,7 @@ ACTOR Future dataDistribution(Reference self) if( e.code() != error_code_movekeys_conflict ) throw err; bool ddEnabled = wait( isDataDistributionEnabled(cx) ); - TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled); + TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err); if( ddEnabled ) throw err; } @@ -4096,6 +4126,12 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference ddSnapCreate(DistributorSnapRequest snapReq, Reference> db ) { state Future dbInfoChange = db->onChange(); + if (!setDDEnabled(false, snapReq.snapUID)) { + // disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails here + TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck"); + snapReq.reply.sendError(operation_failed()); + return Void(); + } double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT; try { choose { @@ -4126,9 +4162,15 @@ ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, Reference dataDistributionRelocator( DDQueueData *self, RelocateData rd relocationComplete.send( rd ); - if( e.code() != error_code_actor_cancelled ) - errorOut.sendError(e); + if( e.code() != error_code_actor_cancelled ) { + if (errorOut.canBeSet()) { + errorOut.sendError(e); + } + } throw; } } diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index b83b3e5859..eaa55dcce7 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1668,6 +1668,7 @@ ACTOR Future masterProxyServerCore( req.reply.send(rep); } when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) { + TraceEvent(SevDebug, "SnapMasterEnqueue"); addActor.send(proxySnapCreate(snapReq, &commitData)); } when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 6a979e3cc5..e89b57b734 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -28,6 +28,40 @@ using std::min; using std::max; +// in-memory flag to disable DD +bool ddEnabled = true; +UID ddEnabledStatusUID = UID(); + +bool isDDEnabled() { + return ddEnabled; +} + +bool setDDEnabled(bool status, UID snapUID) { + TraceEvent("SetDDEnabled") + .detail("Status", status) + .detail("SnapUID", snapUID); + ASSERT(snapUID != UID()); + if (!status) { + // disabling DD + if (ddEnabledStatusUID != UID()) { + // disable DD when a disable is already in progress not allowed + return false; + } + ddEnabled = status; + ddEnabledStatusUID = snapUID; + return true; + } + // enabling DD + if (snapUID != ddEnabledStatusUID) { + // enabling DD not allowed if UID does not match with the disable request + return false; + } + // reset to default status + ddEnabled = status; + ddEnabledStatusUID = UID(); + return true; +} + ACTOR Future takeMoveKeysLock( Database cx, UID masterId ) { state Transaction tr(cx); loop { @@ -58,6 +92,10 @@ ACTOR Future takeMoveKeysLock( Database cx, UID masterId ) { } ACTOR Future checkMoveKeysLock( Transaction* tr, MoveKeysLock lock, bool isWrite = true ) { + if (!isDDEnabled()) { + TraceEvent(SevDebug, "DDDisabledByInMemoryCheck"); + throw movekeys_conflict(); + } Optional readVal = wait( tr->get( moveKeysLockOwnerKey ) ); UID currentOwner = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); diff --git a/fdbserver/MoveKeys.actor.h b/fdbserver/MoveKeys.actor.h index 9e44af3076..fcab3fc05d 100644 --- a/fdbserver/MoveKeys.actor.h +++ b/fdbserver/MoveKeys.actor.h @@ -47,6 +47,12 @@ Future checkMoveKeysLockReadOnly( Transaction* tr, MoveKeysLock lock ); // Checks that the a moveKeysLock has not changed since having taken it // This does not modify the moveKeysLock +bool isDDEnabled(); +// checks if the in-memory DDEnabled flag is set + +bool setDDEnabled(bool status, UID snapUID); +// sets the in-memory DDEnabled flag + void seedShardServers( Arena& trArena, CommitTransactionRef &tr,