Merge pull request #135 from cie/switch-for-data-distribution
Add a switch to turn off data distribution in CLI
This commit is contained in:
commit
47a37f3f1e
|
@ -504,6 +504,7 @@ void initHelp() {
|
|||
"If no addresses are specified, populates the list of processes which can be killed. Processes cannot be killed before this list has been populated.\n\nIf `all' is specified, attempts to kill all known processes.\n\nIf `list' is specified, displays all known processes. This is only useful when the database is unresponsive.\n\nFor each IP:port pair in <ADDRESS>*, attempt to kill the specified process.");
|
||||
|
||||
hiddenCommands.insert("expensive_data_check");
|
||||
hiddenCommands.insert("datadistribution");
|
||||
}
|
||||
|
||||
void printVersion() {
|
||||
|
@ -2717,6 +2718,25 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tokencmp(tokens[0], "datadistribution")) {
|
||||
if (tokens.size() != 2) {
|
||||
printf("Usage: datadistribution <on|off>\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
if(tokencmp(tokens[1], "on")) {
|
||||
int _ = wait(setDDMode(db, 1));
|
||||
printf("Data distribution is enabled\n");
|
||||
} else if(tokencmp(tokens[1], "off")) {
|
||||
int _ = wait(setDDMode(db, 0));
|
||||
printf("Data distribution is disabled\n");
|
||||
} else {
|
||||
printf("Usage: datadistribution <on|off>\n");
|
||||
is_error = true;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tokencmp(tokens[0], "option")) {
|
||||
if (tokens.size() == 2 || tokens.size() > 4) {
|
||||
printUsage(tokens[0]);
|
||||
|
|
|
@ -1044,6 +1044,39 @@ ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<int> setDDMode( Database cx, int mode ) {
|
||||
state Transaction tr(cx);
|
||||
state int oldMode = -1;
|
||||
state BinaryWriter wr(Unversioned());
|
||||
wr << mode;
|
||||
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> old = wait( tr.get( dataDistributionModeKey ) );
|
||||
if (oldMode < 0) {
|
||||
oldMode = 1;
|
||||
if (old.present()) {
|
||||
BinaryReader rd(old.get(), Unversioned());
|
||||
rd >> oldMode;
|
||||
}
|
||||
}
|
||||
if (!mode) {
|
||||
BinaryWriter wrMyOwner(Unversioned());
|
||||
wrMyOwner << dataDistributionModeLock;
|
||||
tr.set( moveKeysLockOwnerKey, wrMyOwner.toStringRef() );
|
||||
}
|
||||
|
||||
tr.set( dataDistributionModeKey, wr.toStringRef() );
|
||||
|
||||
Void _ = wait( tr.commit() );
|
||||
return oldMode;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("setDDModeRetrying").error(e);
|
||||
Void _ = wait (tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion> excl ) {
|
||||
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
|
||||
|
||||
|
|
|
@ -153,6 +153,8 @@ Future<Void> unlockDatabase( Database const& cx, UID const& id );
|
|||
Future<Void> checkDatabaseLock( Transaction* const& tr, UID const& id );
|
||||
Future<Void> checkDatabaseLock( Reference<ReadYourWritesTransaction> const& tr, UID const& id );
|
||||
|
||||
Future<int> setDDMode( Database const& cx, int const& mode );
|
||||
|
||||
// Gets the cluster connection string
|
||||
Future<std::vector<NetworkAddress>> getCoordinators( Database const& cx );
|
||||
#endif
|
|
@ -1878,54 +1878,6 @@ ACTOR Future<bool> isDataDistributionEnabled( Database cx ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<int> disableDataDistribution( Database cx ) {
|
||||
state Transaction tr(cx);
|
||||
state int oldMode = -1;
|
||||
state BinaryWriter wr(Unversioned());
|
||||
wr << 0;
|
||||
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> old = wait( tr.get( dataDistributionModeKey ) );
|
||||
if (oldMode < 0) {
|
||||
oldMode = 1;
|
||||
if (old.present()) {
|
||||
BinaryReader rd(old.get(), Unversioned());
|
||||
rd >> oldMode;
|
||||
}
|
||||
}
|
||||
// SOMEDAY: Write a wrapper in MoveKeys.h
|
||||
BinaryWriter wrMyOwner(Unversioned()); wrMyOwner << dataDistributionModeLock;
|
||||
tr.set( moveKeysLockOwnerKey, wrMyOwner.toStringRef() );
|
||||
tr.set( dataDistributionModeKey, wr.toStringRef() );
|
||||
|
||||
Void _ = wait( tr.commit() );
|
||||
return oldMode;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("disableDDModeRetrying").error(e);
|
||||
Void _ = wait ( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> enableDataDistribution( Database cx, int mode ) {
|
||||
state Transaction tr(cx);
|
||||
state BinaryWriter wr(Unversioned());
|
||||
wr << mode;
|
||||
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> old = wait( tr.get( dataDistributionModeKey ) );
|
||||
tr.set( dataDistributionModeKey, wr.toStringRef() );
|
||||
Void _ = wait( tr.commit() );
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
TraceEvent("enableDDModeRetrying").error(e);
|
||||
Void _ = wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Ensures that the serverKeys key space is properly coalesced
|
||||
//This method is only used for testing and is not implemented in a manner that is safe for large databases
|
||||
ACTOR Future<Void> debugCheckCoalescing(Database cx) {
|
||||
|
|
|
@ -210,7 +210,4 @@ struct ShardSizeBounds {
|
|||
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
|
||||
|
||||
//Determines the maximum shard size based on the size of the database
|
||||
int64_t getMaxShardSize( double dbSizeEstimate );
|
||||
|
||||
Future<Void> enableDataDistribution( Database const& cx, int const& mode );
|
||||
Future<int> disableDataDistribution( Database const& cx );
|
||||
int64_t getMaxShardSize( double dbSizeEstimate );
|
|
@ -21,6 +21,7 @@
|
|||
#include "flow/actorcompiler.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/ManagementAPI.h"
|
||||
#include "fdbserver/MoveKeys.h"
|
||||
#include "fdbclient/NativeAPI.h"
|
||||
#include "workloads.h"
|
||||
|
@ -65,15 +66,13 @@ struct MoveKeysWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
state int oldMode = wait( self->setDDMode( cx, 0 ) );
|
||||
state int oldMode = wait( setDDMode( cx, 0 ) );
|
||||
TraceEvent("RMKStartModeSetting");
|
||||
Void _ = wait( timeout( reportErrors( self->worker( cx, self ), "moveKeysWorkloadWorkerError" ), self->testDuration, Void() ) );
|
||||
// Always set the DD mode back, even if we die with an error
|
||||
TraceEvent("RMKDoneMoving");
|
||||
int _ = wait( self->setDDMode( cx, oldMode ) );
|
||||
int _ = wait( setDDMode( cx, oldMode ) );
|
||||
TraceEvent("RMKDoneModeSetting");
|
||||
Void _ = wait( self->forceMasterFailure(cx, self) );
|
||||
TraceEvent("RMKDoneKillingMaster");
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -116,33 +115,6 @@ struct MoveKeysWorkload : TestWorkload {
|
|||
return vector<StorageServerInterface>(t.begin(), t.end());
|
||||
}
|
||||
|
||||
ACTOR Future<int> setDDMode( Database cx, int mode ) {
|
||||
state Transaction tr(cx);
|
||||
state int oldMode = -1;
|
||||
state BinaryWriter wr(Unversioned());
|
||||
wr << mode;
|
||||
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> old = wait( tr.get( dataDistributionModeKey ) );
|
||||
if (oldMode < 0) {
|
||||
oldMode = 1;
|
||||
if (old.present()) {
|
||||
BinaryReader rd(old.get(), Unversioned());
|
||||
rd >> oldMode;
|
||||
}
|
||||
}
|
||||
tr.set( dataDistributionModeKey, wr.toStringRef() );
|
||||
|
||||
Void _ = wait( tr.commit() );
|
||||
return oldMode;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("setDDModeRetrying").error(e);
|
||||
Void _ = wait (tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doMoveKeys(Database cx, MoveKeysWorkload *self, KeyRange keys, vector<StorageServerInterface> destinationTeam,
|
||||
MoveKeysLock lock, std::string dbName ) {
|
||||
state TraceInterval relocateShardInterval("RelocateShard");
|
||||
|
|
Loading…
Reference in New Issue