Merge branch 'master' into fix-machine-id-parameter

This commit is contained in:
A.J. Beamon 2019-07-30 17:16:36 -07:00
commit 15474feb81
6 changed files with 116 additions and 36 deletions

View File

@ -3355,11 +3355,14 @@ ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef sna
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.snapshotDatabase.Before");
}
ProxySnapRequest req(snapPayload, snapUID, debugID);
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, req, cx->taskID, true /*atmostOnce*/ ));
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
"NativeAPI.SnapshotDatabase.After");
choose {
when(wait(cx->onMasterProxiesChanged())) { throw operation_failed(); }
when(wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, ProxySnapRequest(snapPayload, snapUID, debugID), cx->taskID, true /*atmostOnce*/ ))) {
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
"NativeAPI.SnapshotDatabase.After");
}
}
} catch (Error& e) {
TraceEvent("NativeAPI.SnapshotDatabaseError")
.detail("SnapPayload", snapPayload)
@ -3370,11 +3373,11 @@ ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef sna
return Void();
}
ACTOR Future<Void> snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) {
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
// remember the client ID before the snap operation
state UID preSnapClientUID = cx->clientInfo->get().id;
TraceEvent("SnapCreateCoreEnter")
TraceEvent("SnapCreateEnter")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
@ -3392,7 +3395,7 @@ ACTOR Future<Void> snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) {
Future<Void> exec = snapshotDatabase(Reference<DatabaseContext>::addRef(cx.getPtr()), snapPayloadRef, snapUID, snapUID);
wait(exec);
} catch (Error& e) {
TraceEvent("SnapCreateCoreError")
TraceEvent("SnapCreateError")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.error(e);
@ -3402,28 +3405,15 @@ ACTOR Future<Void> snapCreateCore(Database cx, StringRef snapCmd, UID snapUID) {
UID postSnapClientUID = cx->clientInfo->get().id;
if (preSnapClientUID != postSnapClientUID) {
// if the client IDs changed then we fail the snapshot
TraceEvent("SnapCreateCoreUIDMismatch")
TraceEvent("SnapCreateUIDMismatch")
.detail("SnapPreSnapClientUID", preSnapClientUID)
.detail("SnapPostSnapClientUID", postSnapClientUID);
throw coordinators_changed();
}
TraceEvent("SnapCreateCoreExit")
TraceEvent("SnapCreateExit")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID)
.detail("PreSnapClientUID", preSnapClientUID);
return Void();
}
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
state int oldMode = wait( setDDMode( cx, 0 ) );
try {
wait(snapCreateCore(cx, snapCmd, snapUID));
} catch (Error& e) {
state Error err = e;
wait(success( setDDMode( cx, oldMode ) ));
throw err;
}
wait(success( setDDMode( cx, oldMode ) ));
return Void();
}

View File

@ -393,9 +393,11 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
BinaryReader rd( mode.get(), Unversioned() );
rd >> result->mode;
}
if (!result->mode) // result->mode can be changed to 0 when we disable data distribution
if (!result->mode || !isDDEnabled()) {
// DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0)
TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD");
return result;
}
state Future<vector<ProcessData>> workers = getWorkers(&tr);
state Future<Standalone<RangeResultRef>> serverList = tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY );
@ -3071,6 +3073,7 @@ ACTOR Future<Void> storageServerFailureTracker(
Version addedVersion )
{
state StorageServerInterface interf = server->lastKnownInterface;
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
loop {
state bool inHealthyZone = self->healthyZone.get().present() && interf.locality.zoneId() == self->healthyZone.get();
if(inHealthyZone) {
@ -3103,7 +3106,7 @@ ACTOR Future<Void> storageServerFailureTracker(
choose {
when ( wait(healthChanged) ) {
status->isFailed = !status->isFailed;
if(!status->isFailed && (server->teams.size() < SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER || self->lastBuildTeamsFailed)) {
if(!status->isFailed && (server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) {
self->doBuildTeams = true;
}
if(status->isFailed && self->healthyZone.get().present() && self->clearHealthyZoneFuture.isReady()) {
@ -3140,6 +3143,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreTypeOrDC = false;
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
try {
loop {
@ -3225,7 +3229,7 @@ ACTOR Future<Void> storageServerTracker(
self->restartRecruiting.trigger();
if (lastIsUnhealthy && !status.isUnhealthy() &&
( server->teams.size() < SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER || self->lastBuildTeamsFailed)) {
( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) {
self->doBuildTeams = true;
self->restartTeamBuilder.trigger(); // This does not trigger building teams if there exist healthy teams
}
@ -3689,12 +3693,21 @@ ACTOR Future<Void> waitForDataDistributionEnabled( Database cx ) {
try {
Optional<Value> mode = wait( tr.get( dataDistributionModeKey ) );
if (!mode.present()) return Void();
if (!mode.present() && isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded");
return Void();
}
if (mode.present()) {
BinaryReader rd( mode.get(), Unversioned() );
int m;
rd >> m;
if (m) return Void();
TraceEvent(SevDebug, "WaitForDDEnabled")
.detail("Mode", m)
.detail("IsDDEnabled()", isDDEnabled());
if (m && isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded");
return Void();
}
}
tr.reset();
@ -3709,18 +3722,32 @@ ACTOR Future<bool> isDataDistributionEnabled( Database cx ) {
loop {
try {
Optional<Value> mode = wait( tr.get( dataDistributionModeKey ) );
if (!mode.present()) return true;
if (!mode.present() && isDDEnabled()) return true;
if (mode.present()) {
BinaryReader rd( mode.get(), Unversioned() );
int m;
rd >> m;
if (m) return true;
if (m && isDDEnabled()) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("Mode", m)
.detail("IsDDEnabled()", isDDEnabled());
return true;
}
}
// SOMEDAY: Write a wrapper in MoveKeys.actor.h
Optional<Value> readVal = wait( tr.get( moveKeysLockOwnerKey ) );
UID currentOwner = readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
if( currentOwner != dataDistributionModeLock )
if( isDDEnabled() && (currentOwner != dataDistributionModeLock ) ) {
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", isDDEnabled());
return true;
}
TraceEvent(SevDebug, "IsDDEnabledFailed")
.detail("CurrentOwner", currentOwner)
.detail("DDModeLock", dataDistributionModeLock)
.detail("IsDDEnabled", isDDEnabled());
return false;
} catch (Error& e) {
wait( tr.onError(e) );
@ -3889,7 +3916,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
TraceEvent("DDInitGotInitialDD", self->ddId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD");
}
if (initData->mode) break; // mode may be set true by system operator using fdbcli
if (initData->mode && isDDEnabled()) {
// mode may be set true by system operator using fdbcli and isDDEnabled() set to true
break;
}
TraceEvent("DataDistributionDisabled", self->ddId);
TraceEvent("MovingData", self->ddId)
@ -3991,7 +4021,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
if( e.code() != error_code_movekeys_conflict )
throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx) );
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled);
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if( ddEnabled )
throw err;
}
@ -4096,6 +4126,12 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<Void> dbInfoChange = db->onChange();
if (!setDDEnabled(false, snapReq.snapUID)) {
// disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails here
TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck");
snapReq.reply.sendError(operation_failed());
return Void();
}
double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
try {
choose {
@ -4126,9 +4162,15 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncV
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
// enable DD should always succeed
bool success = setDDEnabled(true, snapReq.snapUID);
ASSERT(success);
throw e;
}
}
// enable DD should always succeed
bool success = setDDEnabled(true, snapReq.snapUID);
ASSERT(success);
return Void();
}

View File

@ -1096,8 +1096,11 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
relocationComplete.send( rd );
if( e.code() != error_code_actor_cancelled )
errorOut.sendError(e);
if( e.code() != error_code_actor_cancelled ) {
if (errorOut.canBeSet()) {
errorOut.sendError(e);
}
}
throw;
}
}

View File

@ -1668,6 +1668,7 @@ ACTOR Future<Void> masterProxyServerCore(
req.reply.send(rep);
}
when(ProxySnapRequest snapReq = waitNext(proxy.proxySnapReq.getFuture())) {
TraceEvent(SevDebug, "SnapMasterEnqueue");
addActor.send(proxySnapCreate(snapReq, &commitData));
}
when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {

View File

@ -28,6 +28,40 @@
using std::min;
using std::max;
// in-memory flag to disable DD
bool ddEnabled = true;
UID ddEnabledStatusUID = UID();
bool isDDEnabled() {
return ddEnabled;
}
bool setDDEnabled(bool status, UID snapUID) {
TraceEvent("SetDDEnabled")
.detail("Status", status)
.detail("SnapUID", snapUID);
ASSERT(snapUID != UID());
if (!status) {
// disabling DD
if (ddEnabledStatusUID != UID()) {
// disable DD when a disable is already in progress not allowed
return false;
}
ddEnabled = status;
ddEnabledStatusUID = snapUID;
return true;
}
// enabling DD
if (snapUID != ddEnabledStatusUID) {
// enabling DD not allowed if UID does not match with the disable request
return false;
}
// reset to default status
ddEnabled = status;
ddEnabledStatusUID = UID();
return true;
}
ACTOR Future<MoveKeysLock> takeMoveKeysLock( Database cx, UID masterId ) {
state Transaction tr(cx);
loop {
@ -58,6 +92,10 @@ ACTOR Future<MoveKeysLock> takeMoveKeysLock( Database cx, UID masterId ) {
}
ACTOR Future<Void> checkMoveKeysLock( Transaction* tr, MoveKeysLock lock, bool isWrite = true ) {
if (!isDDEnabled()) {
TraceEvent(SevDebug, "DDDisabledByInMemoryCheck");
throw movekeys_conflict();
}
Optional<Value> readVal = wait( tr->get( moveKeysLockOwnerKey ) );
UID currentOwner = readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();

View File

@ -47,6 +47,12 @@ Future<Void> checkMoveKeysLockReadOnly( Transaction* tr, MoveKeysLock lock );
// Checks that the a moveKeysLock has not changed since having taken it
// This does not modify the moveKeysLock
bool isDDEnabled();
// checks if the in-memory DDEnabled flag is set
bool setDDEnabled(bool status, UID snapUID);
// sets the in-memory DDEnabled flag
void seedShardServers(
Arena& trArena,
CommitTransactionRef &tr,