Merge branch 'main' into feature-tenant-groups
This commit is contained in:
commit
f6e3019a10
|
@ -561,6 +561,7 @@ def profile(logger):
|
||||||
assert output2 == default_profile_client_get_output
|
assert output2 == default_profile_client_get_output
|
||||||
# set rate and size limit
|
# set rate and size limit
|
||||||
run_fdbcli_command('profile', 'client', 'set', '0.5', '1GB')
|
run_fdbcli_command('profile', 'client', 'set', '0.5', '1GB')
|
||||||
|
time.sleep(1) # global config can take some time to sync
|
||||||
output3 = run_fdbcli_command('profile', 'client', 'get')
|
output3 = run_fdbcli_command('profile', 'client', 'get')
|
||||||
logger.debug(output3)
|
logger.debug(output3)
|
||||||
output3_list = output3.split(' ')
|
output3_list = output3.split(' ')
|
||||||
|
@ -569,6 +570,7 @@ def profile(logger):
|
||||||
assert output3_list[-1] == '1000000000.'
|
assert output3_list[-1] == '1000000000.'
|
||||||
# change back to default value and check
|
# change back to default value and check
|
||||||
run_fdbcli_command('profile', 'client', 'set', 'default', 'default')
|
run_fdbcli_command('profile', 'client', 'set', 'default', 'default')
|
||||||
|
time.sleep(1) # global config can take some time to sync
|
||||||
assert run_fdbcli_command('profile', 'client', 'get') == default_profile_client_get_output
|
assert run_fdbcli_command('profile', 'client', 'get') == default_profile_client_get_output
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -203,6 +203,10 @@ void ClientKnobs::initialize(Randomize randomize) {
|
||||||
init( DEFAULT_COMMIT_GRV_PROXIES_RATIO, 3 );
|
init( DEFAULT_COMMIT_GRV_PROXIES_RATIO, 3 );
|
||||||
init( DEFAULT_MAX_GRV_PROXIES, 4 );
|
init( DEFAULT_MAX_GRV_PROXIES, 4 );
|
||||||
|
|
||||||
|
init( GLOBAL_CONFIG_REFRESH_BACKOFF, 0.5 );
|
||||||
|
init( GLOBAL_CONFIG_REFRESH_MAX_BACKOFF, 60.0 );
|
||||||
|
init( GLOBAL_CONFIG_REFRESH_TIMEOUT, 10.0 );
|
||||||
|
|
||||||
init( IS_ACCEPTABLE_DELAY, 1.5 );
|
init( IS_ACCEPTABLE_DELAY, 1.5 );
|
||||||
|
|
||||||
init( HTTP_READ_SIZE, 128*1024 );
|
init( HTTP_READ_SIZE, 128*1024 );
|
||||||
|
|
|
@ -101,7 +101,7 @@ void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any
|
||||||
}
|
}
|
||||||
|
|
||||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||||
// TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
|
// TraceEvent(SevInfo, "GlobalConfigInsert").detail("Key", key).detail("Value", value);
|
||||||
data.erase(key);
|
data.erase(key);
|
||||||
|
|
||||||
Arena arena(key.expectedSize() + value.expectedSize());
|
Arena arena(key.expectedSize() + value.expectedSize());
|
||||||
|
@ -139,7 +139,7 @@ void GlobalConfig::erase(Key key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void GlobalConfig::erase(KeyRangeRef range) {
|
void GlobalConfig::erase(KeyRangeRef range) {
|
||||||
// TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
|
// TraceEvent(SevInfo, "GlobalConfigErase").detail("Range", range);
|
||||||
auto it = data.begin();
|
auto it = data.begin();
|
||||||
while (it != data.end()) {
|
while (it != data.end()) {
|
||||||
if (range.contains(it->first)) {
|
if (range.contains(it->first)) {
|
||||||
|
@ -153,95 +153,29 @@ void GlobalConfig::erase(KeyRangeRef range) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Older FDB versions used different keys for client profiling data. This
|
|
||||||
// function performs a one-time migration of data in these keys to the new
|
|
||||||
// global configuration key space.
|
|
||||||
ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|
||||||
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
|
|
||||||
state Reference<ReadYourWritesTransaction> tr;
|
|
||||||
try {
|
|
||||||
state Backoff backoff;
|
|
||||||
loop {
|
|
||||||
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(self->cx)));
|
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
||||||
|
|
||||||
try {
|
|
||||||
state Optional<Value> migrated = wait(tr->get(migratedKey));
|
|
||||||
if (migrated.present()) {
|
|
||||||
// Already performed migration.
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
state Optional<Value> sampleRate =
|
|
||||||
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
|
||||||
state Optional<Value> sizeLimit =
|
|
||||||
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
|
||||||
|
|
||||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
|
||||||
// The value doesn't matter too much, as long as the key is set.
|
|
||||||
tr->set(migratedKey.contents(), "1"_sr);
|
|
||||||
if (sampleRate.present()) {
|
|
||||||
const double sampleRateDbl =
|
|
||||||
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
|
||||||
Tuple rate = Tuple::makeTuple(sampleRateDbl);
|
|
||||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
|
||||||
}
|
|
||||||
if (sizeLimit.present()) {
|
|
||||||
const int64_t sizeLimitInt =
|
|
||||||
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
|
||||||
Tuple size = Tuple::makeTuple(sizeLimitInt);
|
|
||||||
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
|
||||||
}
|
|
||||||
|
|
||||||
wait(tr->commit());
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
// If multiple fdbserver processes are started at once, they will all
|
|
||||||
// attempt this migration at the same time, sometimes resulting in
|
|
||||||
// aborts due to conflicts. Purposefully avoid retrying, making this
|
|
||||||
// migration best-effort.
|
|
||||||
TraceEvent(SevInfo, "GlobalConfig_RetryableMigrationError").errorUnsuppressed(e).suppressFor(1.0);
|
|
||||||
wait(tr->onError(e));
|
|
||||||
tr.clear();
|
|
||||||
// tr is cleared, so it won't backoff properly. Use custom backoff logic here.
|
|
||||||
wait(backoff.onError());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Error& e) {
|
|
||||||
// Catch non-retryable errors (and do nothing).
|
|
||||||
TraceEvent(SevWarnAlways, "GlobalConfig_MigrationError").error(e);
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updates local copy of global configuration by reading the entire key-range
|
// Updates local copy of global configuration by reading the entire key-range
|
||||||
// from storage.
|
// from storage (proxied through the GrvProxies).
|
||||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown) {
|
||||||
// TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
|
// TraceEvent trace(SevInfo, "GlobalConfigRefresh");
|
||||||
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
||||||
|
|
||||||
state Backoff backoff;
|
state Backoff backoff(CLIENT_KNOBS->GLOBAL_CONFIG_REFRESH_BACKOFF, CLIENT_KNOBS->GLOBAL_CONFIG_REFRESH_MAX_BACKOFF);
|
||||||
|
|
||||||
state Reference<ReadYourWritesTransaction> tr;
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(self->cx)));
|
GlobalConfigRefreshReply reply =
|
||||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
wait(timeoutError(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False),
|
||||||
RangeResult result = wait(tr->getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
|
&GrvProxyInterface::refreshGlobalConfig,
|
||||||
for (const auto& kv : result) {
|
GlobalConfigRefreshRequest{ lastKnown }),
|
||||||
|
CLIENT_KNOBS->GLOBAL_CONFIG_REFRESH_TIMEOUT));
|
||||||
|
for (const auto& kv : reply.result) {
|
||||||
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
|
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
|
||||||
self->insert(systemKey, kv.value);
|
self->insert(systemKey, kv.value);
|
||||||
}
|
}
|
||||||
break;
|
return Void();
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
TraceEvent("GlobalConfigRefreshError").errorUnsuppressed(e).suppressFor(1.0);
|
|
||||||
wait(tr->onError(e));
|
|
||||||
tr.clear();
|
|
||||||
// tr is cleared, so it won't backoff properly. Use custom backoff logic here.
|
|
||||||
wait(backoff.onError());
|
wait(backoff.onError());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Void();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Applies updates to the local copy of the global configuration when this
|
// Applies updates to the local copy of the global configuration when this
|
||||||
|
@ -251,9 +185,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
|
||||||
try {
|
try {
|
||||||
if (self->initialized.canBeSet()) {
|
if (self->initialized.canBeSet()) {
|
||||||
wait(self->cx->onConnected());
|
wait(self->cx->onConnected());
|
||||||
wait(self->migrate(self));
|
|
||||||
|
|
||||||
wait(self->refresh(self));
|
wait(self->refresh(self, -1));
|
||||||
self->initialized.send(Void());
|
self->initialized.send(Void());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +203,7 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
|
||||||
// This process missed too many global configuration
|
// This process missed too many global configuration
|
||||||
// history updates or the protocol version changed, so it
|
// history updates or the protocol version changed, so it
|
||||||
// must re-read the entire configuration range.
|
// must re-read the entire configuration range.
|
||||||
wait(self->refresh(self));
|
wait(self->refresh(self, history.back().version));
|
||||||
if (dbInfo->history.size() > 0) {
|
if (dbInfo->history.size() > 0) {
|
||||||
self->lastUpdate = dbInfo->history.back().version;
|
self->lastUpdate = dbInfo->history.back().version;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1475,7 +1475,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
||||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
||||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
||||||
|
|
||||||
dbId = deterministicRandom()->randomUniqueID();
|
dbId = deterministicRandom()->randomUniqueID();
|
||||||
|
|
||||||
|
TraceEvent("DatabaseContextCreated", dbId).backtrace();
|
||||||
|
|
||||||
connected = (clientInfo->get().commitProxies.size() && clientInfo->get().grvProxies.size())
|
connected = (clientInfo->get().commitProxies.size() && clientInfo->get().grvProxies.size())
|
||||||
? Void()
|
? Void()
|
||||||
: clientInfo->onChange();
|
: clientInfo->onChange();
|
||||||
|
@ -1804,6 +1808,8 @@ DatabaseContext::~DatabaseContext() {
|
||||||
it->second->notifyContextDestroyed();
|
it->second->notifyContextDestroyed();
|
||||||
ASSERT_ABORT(server_interf.empty());
|
ASSERT_ABORT(server_interf.empty());
|
||||||
locationCache.insert(allKeys, Reference<LocationInfo>());
|
locationCache.insert(allKeys, Reference<LocationInfo>());
|
||||||
|
|
||||||
|
TraceEvent("DatabaseContextDestructed", dbId).backtrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<KeyRangeLocationInfo> DatabaseContext::getCachedLocation(const Optional<TenantName>& tenantName,
|
Optional<KeyRangeLocationInfo> DatabaseContext::getCachedLocation(const Optional<TenantName>& tenantName,
|
||||||
|
|
|
@ -482,6 +482,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( RESET_MASTER_DELAY, 300.0 );
|
init( RESET_MASTER_DELAY, 300.0 );
|
||||||
init( RESET_RESOLVER_DELAY, 300.0 );
|
init( RESET_RESOLVER_DELAY, 300.0 );
|
||||||
|
|
||||||
|
init( GLOBAL_CONFIG_MIGRATE_TIMEOUT, 5.0 );
|
||||||
|
init( GLOBAL_CONFIG_REFRESH_INTERVAL, 1.0 ); if ( randomize && BUGGIFY ) GLOBAL_CONFIG_REFRESH_INTERVAL = 0.1;
|
||||||
|
init( GLOBAL_CONFIG_REFRESH_TIMEOUT, 10.0 ); if ( randomize && BUGGIFY ) GLOBAL_CONFIG_REFRESH_TIMEOUT = 1.0;
|
||||||
|
|
||||||
// Master Server
|
// Master Server
|
||||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||||
// by delay()ing for this amount of time between accepted batches of TransactionRequests.
|
// by delay()ing for this amount of time between accepted batches of TransactionRequests.
|
||||||
|
@ -740,6 +744,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
||||||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||||
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
||||||
|
init( STORAGE_SERVER_SHARD_AWARE, true );
|
||||||
|
|
||||||
//Wait Failure
|
//Wait Failure
|
||||||
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
||||||
|
|
|
@ -475,18 +475,21 @@ const Value serverKeysValue(const UID& id) {
|
||||||
|
|
||||||
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) {
|
void decodeServerKeysValue(const ValueRef& value, bool& assigned, bool& emptyRange, UID& id) {
|
||||||
if (value.size() == 0) {
|
if (value.size() == 0) {
|
||||||
id = UID();
|
|
||||||
assigned = false;
|
assigned = false;
|
||||||
emptyRange = false;
|
emptyRange = false;
|
||||||
|
id = UID();
|
||||||
} else if (value == serverKeysTrue) {
|
} else if (value == serverKeysTrue) {
|
||||||
assigned = true;
|
assigned = true;
|
||||||
emptyRange = false;
|
emptyRange = false;
|
||||||
|
id = anonymousShardId;
|
||||||
} else if (value == serverKeysTrueEmptyRange) {
|
} else if (value == serverKeysTrueEmptyRange) {
|
||||||
assigned = true;
|
assigned = true;
|
||||||
emptyRange = true;
|
emptyRange = true;
|
||||||
|
id = anonymousShardId;
|
||||||
} else if (value == serverKeysFalse) {
|
} else if (value == serverKeysFalse) {
|
||||||
assigned = false;
|
assigned = false;
|
||||||
emptyRange = false;
|
emptyRange = false;
|
||||||
|
id = UID();
|
||||||
} else {
|
} else {
|
||||||
BinaryReader rd(value, IncludeVersion());
|
BinaryReader rd(value, IncludeVersion());
|
||||||
ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData());
|
ASSERT(rd.protocolVersion().hasShardEncodeLocationMetaData());
|
||||||
|
|
|
@ -205,6 +205,10 @@ public:
|
||||||
int32_t DEFAULT_AUTO_RESOLVERS;
|
int32_t DEFAULT_AUTO_RESOLVERS;
|
||||||
int32_t DEFAULT_AUTO_LOGS;
|
int32_t DEFAULT_AUTO_LOGS;
|
||||||
|
|
||||||
|
double GLOBAL_CONFIG_REFRESH_BACKOFF;
|
||||||
|
double GLOBAL_CONFIG_REFRESH_MAX_BACKOFF;
|
||||||
|
double GLOBAL_CONFIG_REFRESH_TIMEOUT;
|
||||||
|
|
||||||
// Dynamic Knobs
|
// Dynamic Knobs
|
||||||
double COMMIT_QUORUM_TIMEOUT;
|
double COMMIT_QUORUM_TIMEOUT;
|
||||||
double GET_GENERATION_QUORUM_TIMEOUT;
|
double GET_GENERATION_QUORUM_TIMEOUT;
|
||||||
|
|
|
@ -552,4 +552,32 @@ struct ExclusionSafetyCheckRequest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct GlobalConfigRefreshReply {
|
||||||
|
constexpr static FileIdentifier file_identifier = 12680327;
|
||||||
|
Arena arena;
|
||||||
|
RangeResultRef result;
|
||||||
|
|
||||||
|
GlobalConfigRefreshReply() {}
|
||||||
|
GlobalConfigRefreshReply(Arena const& arena, RangeResultRef result) : arena(arena), result(result) {}
|
||||||
|
|
||||||
|
template <class Ar>
|
||||||
|
void serialize(Ar& ar) {
|
||||||
|
serializer(ar, result, arena);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct GlobalConfigRefreshRequest {
|
||||||
|
constexpr static FileIdentifier file_identifier = 2828131;
|
||||||
|
Version lastKnown;
|
||||||
|
ReplyPromise<GlobalConfigRefreshReply> reply;
|
||||||
|
|
||||||
|
GlobalConfigRefreshRequest() {}
|
||||||
|
explicit GlobalConfigRefreshRequest(Version lastKnown) : lastKnown(lastKnown) {}
|
||||||
|
|
||||||
|
template <class Ar>
|
||||||
|
void serialize(Ar& ar) {
|
||||||
|
serializer(ar, lastKnown, reply);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -638,14 +638,18 @@ private:
|
||||||
|
|
||||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||||
struct Backoff {
|
struct Backoff {
|
||||||
|
Backoff(double backoff = CLIENT_KNOBS->DEFAULT_BACKOFF, double maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF)
|
||||||
|
: backoff(backoff), maxBackoff(maxBackoff) {}
|
||||||
|
|
||||||
Future<Void> onError() {
|
Future<Void> onError() {
|
||||||
double currentBackoff = backoff;
|
double currentBackoff = backoff;
|
||||||
backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, CLIENT_KNOBS->DEFAULT_MAX_BACKOFF);
|
backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, maxBackoff);
|
||||||
return delay(currentBackoff * deterministicRandom()->random01());
|
return delay(currentBackoff * deterministicRandom()->random01());
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
double backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
|
double backoff;
|
||||||
|
double maxBackoff;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -163,8 +163,7 @@ private:
|
||||||
// of the global configuration keyspace.
|
// of the global configuration keyspace.
|
||||||
void erase(KeyRangeRef range);
|
void erase(KeyRangeRef range);
|
||||||
|
|
||||||
ACTOR static Future<Void> migrate(GlobalConfig* self);
|
ACTOR static Future<Void> refresh(GlobalConfig* self, Version lastKnown);
|
||||||
ACTOR static Future<Void> refresh(GlobalConfig* self);
|
|
||||||
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
||||||
|
|
||||||
DatabaseContext* cx;
|
DatabaseContext* cx;
|
||||||
|
|
|
@ -26,8 +26,9 @@
|
||||||
#include "fdbrpc/fdbrpc.h"
|
#include "fdbrpc/fdbrpc.h"
|
||||||
#include "fdbclient/FDBTypes.h"
|
#include "fdbclient/FDBTypes.h"
|
||||||
|
|
||||||
// GrvProxy is proxy primarily specializing on serving GetReadVersion. It also serves health metrics since it
|
// GrvProxy is proxy primarily specializing on serving GetReadVersion. It also
|
||||||
// communicates with RateKeeper to gather health information of the cluster.
|
// serves health metrics since it communicates with RateKeeper to gather health
|
||||||
|
// information of the cluster, and handles proxied GlobalConfig requests.
|
||||||
struct GrvProxyInterface {
|
struct GrvProxyInterface {
|
||||||
constexpr static FileIdentifier file_identifier = 8743216;
|
constexpr static FileIdentifier file_identifier = 8743216;
|
||||||
enum { LocationAwareLoadBalance = 1 };
|
enum { LocationAwareLoadBalance = 1 };
|
||||||
|
@ -43,6 +44,7 @@ struct GrvProxyInterface {
|
||||||
// committed)
|
// committed)
|
||||||
RequestStream<ReplyPromise<Void>> waitFailure; // reports heartbeat to master.
|
RequestStream<ReplyPromise<Void>> waitFailure; // reports heartbeat to master.
|
||||||
RequestStream<struct GetHealthMetricsRequest> getHealthMetrics;
|
RequestStream<struct GetHealthMetricsRequest> getHealthMetrics;
|
||||||
|
RequestStream<struct GlobalConfigRefreshRequest> refreshGlobalConfig;
|
||||||
|
|
||||||
UID id() const { return getConsistentReadVersion.getEndpoint().token; }
|
UID id() const { return getConsistentReadVersion.getEndpoint().token; }
|
||||||
std::string toString() const { return id().shortString(); }
|
std::string toString() const { return id().shortString(); }
|
||||||
|
@ -59,6 +61,8 @@ struct GrvProxyInterface {
|
||||||
RequestStream<ReplyPromise<Void>>(getConsistentReadVersion.getEndpoint().getAdjustedEndpoint(1));
|
RequestStream<ReplyPromise<Void>>(getConsistentReadVersion.getEndpoint().getAdjustedEndpoint(1));
|
||||||
getHealthMetrics = RequestStream<struct GetHealthMetricsRequest>(
|
getHealthMetrics = RequestStream<struct GetHealthMetricsRequest>(
|
||||||
getConsistentReadVersion.getEndpoint().getAdjustedEndpoint(2));
|
getConsistentReadVersion.getEndpoint().getAdjustedEndpoint(2));
|
||||||
|
refreshGlobalConfig = RequestStream<struct GlobalConfigRefreshRequest>(
|
||||||
|
getConsistentReadVersion.getEndpoint().getAdjustedEndpoint(3));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +71,7 @@ struct GrvProxyInterface {
|
||||||
streams.push_back(getConsistentReadVersion.getReceiver(TaskPriority::ReadSocket));
|
streams.push_back(getConsistentReadVersion.getReceiver(TaskPriority::ReadSocket));
|
||||||
streams.push_back(waitFailure.getReceiver());
|
streams.push_back(waitFailure.getReceiver());
|
||||||
streams.push_back(getHealthMetrics.getReceiver());
|
streams.push_back(getHealthMetrics.getReceiver());
|
||||||
|
streams.push_back(refreshGlobalConfig.getReceiver());
|
||||||
FlowTransport::transport().addEndpoints(streams);
|
FlowTransport::transport().addEndpoints(streams);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -399,6 +399,10 @@ public:
|
||||||
double RESET_MASTER_DELAY;
|
double RESET_MASTER_DELAY;
|
||||||
double RESET_RESOLVER_DELAY;
|
double RESET_RESOLVER_DELAY;
|
||||||
|
|
||||||
|
double GLOBAL_CONFIG_MIGRATE_TIMEOUT;
|
||||||
|
double GLOBAL_CONFIG_REFRESH_INTERVAL;
|
||||||
|
double GLOBAL_CONFIG_REFRESH_TIMEOUT;
|
||||||
|
|
||||||
// Master Server
|
// Master Server
|
||||||
double COMMIT_SLEEP_TIME;
|
double COMMIT_SLEEP_TIME;
|
||||||
double MIN_BALANCE_TIME;
|
double MIN_BALANCE_TIME;
|
||||||
|
@ -698,6 +702,7 @@ public:
|
||||||
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
||||||
int QUICK_GET_KEY_VALUES_LIMIT;
|
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||||
|
bool STORAGE_SERVER_SHARD_AWARE;
|
||||||
|
|
||||||
// Wait Failure
|
// Wait Failure
|
||||||
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include <ostream>
|
#include <ostream>
|
||||||
#include "fdbclient/FDBTypes.h"
|
#include "fdbclient/FDBTypes.h"
|
||||||
#include "fdbclient/StorageCheckpoint.h"
|
#include "fdbclient/StorageCheckpoint.h"
|
||||||
|
#include "fdbclient/StorageServerShard.h"
|
||||||
#include "fdbrpc/Locality.h"
|
#include "fdbrpc/Locality.h"
|
||||||
#include "fdbrpc/QueueModel.h"
|
#include "fdbrpc/QueueModel.h"
|
||||||
#include "fdbrpc/fdbrpc.h"
|
#include "fdbrpc/fdbrpc.h"
|
||||||
|
@ -572,12 +573,13 @@ struct GetShardStateReply {
|
||||||
|
|
||||||
Version first;
|
Version first;
|
||||||
Version second;
|
Version second;
|
||||||
|
std::vector<StorageServerShard> shards;
|
||||||
GetShardStateReply() = default;
|
GetShardStateReply() = default;
|
||||||
GetShardStateReply(Version first, Version second) : first(first), second(second) {}
|
GetShardStateReply(Version first, Version second) : first(first), second(second) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, first, second);
|
serializer(ar, first, second, shards);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -587,13 +589,16 @@ struct GetShardStateRequest {
|
||||||
|
|
||||||
KeyRange keys;
|
KeyRange keys;
|
||||||
int32_t mode;
|
int32_t mode;
|
||||||
|
bool includePhysicalShard;
|
||||||
ReplyPromise<GetShardStateReply> reply;
|
ReplyPromise<GetShardStateReply> reply;
|
||||||
GetShardStateRequest() {}
|
GetShardStateRequest() = default;
|
||||||
GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode) {}
|
GetShardStateRequest(KeyRange const& keys, waitMode mode, bool includePhysicalShard)
|
||||||
|
: keys(keys), mode(mode), includePhysicalShard(includePhysicalShard) {}
|
||||||
|
GetShardStateRequest(KeyRange const& keys, waitMode mode) : keys(keys), mode(mode), includePhysicalShard(false) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, keys, mode, reply);
|
serializer(ar, keys, mode, reply, includePhysicalShard);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
/*
|
||||||
|
* StorageServerShard.h
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FDBCLIENT_STORAGESERVERSHARD_H
|
||||||
|
#define FDBCLIENT_STORAGESERVERSHARD_H
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "fdbclient/FDBTypes.h"
|
||||||
|
#include "flow/flow.h"
|
||||||
|
|
||||||
|
// Represents a data shard on a storage server hosting a continuous keyrange.
|
||||||
|
struct StorageServerShard {
|
||||||
|
constexpr static FileIdentifier file_identifier = 4028358;
|
||||||
|
|
||||||
|
enum ShardState {
|
||||||
|
NotAssigned = 0,
|
||||||
|
MovingIn = 1,
|
||||||
|
ReadWritePending = 2,
|
||||||
|
ReadWrite = 3,
|
||||||
|
};
|
||||||
|
|
||||||
|
StorageServerShard() = default;
|
||||||
|
StorageServerShard(KeyRange range,
|
||||||
|
Version version,
|
||||||
|
const uint64_t id,
|
||||||
|
const uint64_t desiredId,
|
||||||
|
ShardState shardState)
|
||||||
|
: range(range), version(version), id(id), desiredId(desiredId), shardState(shardState) {}
|
||||||
|
|
||||||
|
static StorageServerShard notAssigned(KeyRange range, Version version = 0) {
|
||||||
|
return StorageServerShard(range, version, 0, 0, NotAssigned);
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardState getShardState() const { return static_cast<ShardState>(this->shardState); };
|
||||||
|
|
||||||
|
void setShardState(const ShardState shardState) { this->shardState = static_cast<int8_t>(shardState); }
|
||||||
|
|
||||||
|
std::string getShardStateString() const {
|
||||||
|
const ShardState ss = getShardState();
|
||||||
|
switch (ss) {
|
||||||
|
case NotAssigned:
|
||||||
|
return "NotAssigned";
|
||||||
|
case MovingIn:
|
||||||
|
return "MovingIn";
|
||||||
|
case ReadWritePending:
|
||||||
|
return "ReadWritePending";
|
||||||
|
case ReadWrite:
|
||||||
|
return "ReadWrite";
|
||||||
|
}
|
||||||
|
return "InvalidState";
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string toString() const {
|
||||||
|
return "StorageServerShard: [Range]: " + Traceable<KeyRangeRef>::toString(range) +
|
||||||
|
" [Shard ID]: " + format("%016llx", this->id) + " [Version]: " + std::to_string(version) +
|
||||||
|
" [State]: " + getShardStateString() + " [Desired Shard ID]: " + format("%016llx", this->desiredId);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class Ar>
|
||||||
|
void serialize(Ar& ar) {
|
||||||
|
serializer(ar, range, version, id, desiredId, shardState);
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyRange range;
|
||||||
|
Version version; // Shard creation version.
|
||||||
|
uint64_t id; // The actual shard ID.
|
||||||
|
uint64_t desiredId; // The intended shard ID.
|
||||||
|
int8_t shardState;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
|
@ -209,6 +209,10 @@ Future<Optional<TenantMapEntry>> createTenant(Reference<DB> db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deletes the tenant with the given name. If tenantId is specified, the tenant being deleted must also have the same
|
||||||
|
// ID. If no matching tenant is found, this function returns without deleting anything. This behavior allows the
|
||||||
|
// function to be used idempotently: if the transaction is retried after having succeeded, it will see that the tenant
|
||||||
|
// is absent (or optionally created with a new ID) and do nothing.
|
||||||
ACTOR template <class Transaction>
|
ACTOR template <class Transaction>
|
||||||
Future<Void> deleteTenantTransaction(Transaction tr,
|
Future<Void> deleteTenantTransaction(Transaction tr,
|
||||||
TenantNameRef name,
|
TenantNameRef name,
|
||||||
|
@ -247,6 +251,8 @@ Future<Void> deleteTenantTransaction(Transaction tr,
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deletes the tenant with the given name. If tenantId is specified, the tenant being deleted must also have the same
|
||||||
|
// ID.
|
||||||
ACTOR template <class DB>
|
ACTOR template <class DB>
|
||||||
Future<Void> deleteTenant(Reference<DB> db, TenantName name, Optional<int64_t> tenantId = Optional<int64_t>()) {
|
Future<Void> deleteTenant(Reference<DB> db, TenantName name, Optional<int64_t> tenantId = Optional<int64_t>()) {
|
||||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||||
|
@ -258,9 +264,12 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name, Optional<int64_t> t
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
|
||||||
if (checkExistence) {
|
if (checkExistence) {
|
||||||
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
|
TenantMapEntry entry = wait(getTenantTransaction(tr, name));
|
||||||
if (!entry.present()) {
|
|
||||||
throw tenant_not_found();
|
// If an ID wasn't specified, use the current ID. This way we cannot inadvertently delete
|
||||||
|
// multiple tenants if this transaction retries.
|
||||||
|
if (!tenantId.present()) {
|
||||||
|
tenantId = entry.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
checkExistence = false;
|
checkExistence = false;
|
||||||
|
|
|
@ -1592,7 +1592,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
||||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||||
state Span span("MP:reply"_loc, self->span.context);
|
state Span span("MP:reply"_loc, self->span.context);
|
||||||
|
|
||||||
const Optional<UID>& debugID = self->debugID;
|
state const Optional<UID>& debugID = self->debugID;
|
||||||
|
|
||||||
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
|
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
|
||||||
//TraceEvent("CPAdvanceMinVersion", self->pProxyCommitData->dbgid).detail("PrvVersion", self->prevVersion).detail("CommitVersion", self->commitVersion).detail("Master", self->pProxyCommitData->master.id().toString()).detail("TxSize", self->trs.size());
|
//TraceEvent("CPAdvanceMinVersion", self->pProxyCommitData->dbgid).detail("PrvVersion", self->prevVersion).detail("CommitVersion", self->commitVersion).detail("Master", self->pProxyCommitData->master.id().toString()).detail("TxSize", self->trs.size());
|
||||||
|
@ -1634,6 +1634,12 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
||||||
writtenTags),
|
writtenTags),
|
||||||
TaskPriority::ProxyMasterVersionReply));
|
TaskPriority::ProxyMasterVersionReply));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (debugID.present()) {
|
||||||
|
g_traceBatch.addEvent(
|
||||||
|
"CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AfterReportRawCommittedVersion");
|
||||||
|
}
|
||||||
|
|
||||||
if (self->commitVersion > pProxyCommitData->committedVersion.get()) {
|
if (self->commitVersion > pProxyCommitData->committedVersion.get()) {
|
||||||
pProxyCommitData->locked = self->lockedAfter;
|
pProxyCommitData->locked = self->lockedAfter;
|
||||||
pProxyCommitData->metadataVersion = self->metadataVersionAfter;
|
pProxyCommitData->metadataVersion = self->metadataVersionAfter;
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/DatabaseContext.h"
|
||||||
#include "fdbclient/Knobs.h"
|
#include "fdbclient/Knobs.h"
|
||||||
#include "fdbclient/Notified.h"
|
#include "fdbclient/Notified.h"
|
||||||
#include "fdbclient/TransactionLineage.h"
|
#include "fdbclient/TransactionLineage.h"
|
||||||
|
#include "fdbclient/Tuple.h"
|
||||||
#include "fdbserver/LogSystem.h"
|
#include "fdbserver/LogSystem.h"
|
||||||
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
||||||
#include "fdbclient/CommitProxyInterface.h"
|
#include "fdbclient/CommitProxyInterface.h"
|
||||||
|
@ -302,6 +304,127 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Older FDB versions used different keys for client profiling data. This
|
||||||
|
// function performs a one-time migration of data in these keys to the new
|
||||||
|
// global configuration key space.
|
||||||
|
ACTOR Future<Void> globalConfigMigrate(GrvProxyData* grvProxyData) {
|
||||||
|
state Key migratedKey("\xff\x02/fdbClientInfo/migrated/"_sr);
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(grvProxyData->cx);
|
||||||
|
try {
|
||||||
|
loop {
|
||||||
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
|
||||||
|
try {
|
||||||
|
state Optional<Value> migrated = wait(tr->get(migratedKey));
|
||||||
|
if (migrated.present()) {
|
||||||
|
// Already performed migration.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
state Optional<Value> sampleRate =
|
||||||
|
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_sample_rate/"_sr)));
|
||||||
|
state Optional<Value> sizeLimit =
|
||||||
|
wait(tr->get(Key("\xff\x02/fdbClientInfo/client_txn_size_limit/"_sr)));
|
||||||
|
|
||||||
|
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||||
|
// The value doesn't matter too much, as long as the key is set.
|
||||||
|
tr->set(migratedKey.contents(), "1"_sr);
|
||||||
|
if (sampleRate.present()) {
|
||||||
|
const double sampleRateDbl =
|
||||||
|
BinaryReader::fromStringRef<double>(sampleRate.get().contents(), Unversioned());
|
||||||
|
Tuple rate = Tuple::makeTuple(sampleRateDbl);
|
||||||
|
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
|
||||||
|
}
|
||||||
|
if (sizeLimit.present()) {
|
||||||
|
const int64_t sizeLimitInt =
|
||||||
|
BinaryReader::fromStringRef<int64_t>(sizeLimit.get().contents(), Unversioned());
|
||||||
|
Tuple size = Tuple::makeTuple(sizeLimitInt);
|
||||||
|
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(tr->commit());
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
// Multiple GRV proxies may attempt this migration at the same
|
||||||
|
// time, sometimes resulting in aborts due to conflicts.
|
||||||
|
TraceEvent(SevInfo, "GlobalConfigRetryableMigrationError").errorUnsuppressed(e).suppressFor(1.0);
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Error& e) {
|
||||||
|
// Catch non-retryable errors (and do nothing).
|
||||||
|
TraceEvent(SevWarnAlways, "GlobalConfigMigrationError").error(e);
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Periodically refresh local copy of global configuration.
|
||||||
|
ACTOR Future<Void> globalConfigRefresh(GrvProxyData* grvProxyData, Version* cachedVersion, RangeResult* cachedData) {
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(grvProxyData->cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||||
|
state Future<Optional<Value>> globalConfigVersionFuture = tr->get(globalConfigVersionKey);
|
||||||
|
state Future<RangeResult> tmpCachedDataFuture = tr->getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY);
|
||||||
|
state Optional<Value> globalConfigVersion = wait(globalConfigVersionFuture);
|
||||||
|
RangeResult tmpCachedData = wait(tmpCachedDataFuture);
|
||||||
|
*cachedData = tmpCachedData;
|
||||||
|
if (globalConfigVersion.present()) {
|
||||||
|
Version parsedVersion;
|
||||||
|
memcpy(&parsedVersion, globalConfigVersion.get().begin(), sizeof(Version));
|
||||||
|
*cachedVersion = bigEndian64(parsedVersion);
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle common GlobalConfig transactions on the server side, because not all
|
||||||
|
// clients are allowed to read system keys. Eventually, this could become its
|
||||||
|
// own role.
|
||||||
|
ACTOR Future<Void> globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProxyInterface grvProxy) {
|
||||||
|
state ActorCollection actors(false);
|
||||||
|
state Future<Void> refreshFuture; // so there is only one running attempt
|
||||||
|
state Version cachedVersion = 0;
|
||||||
|
state RangeResult cachedData;
|
||||||
|
|
||||||
|
// Attempt to refresh the configuration database while the migration is
|
||||||
|
// ongoing. This is a small optimization to avoid waiting for the migration
|
||||||
|
// actor to complete.
|
||||||
|
refreshFuture = timeout(globalConfigRefresh(grvProxyData, &cachedVersion, &cachedData),
|
||||||
|
SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_TIMEOUT,
|
||||||
|
Void()) &&
|
||||||
|
delay(SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_INTERVAL);
|
||||||
|
|
||||||
|
// Run one-time migration to support upgrades.
|
||||||
|
wait(success(timeout(globalConfigMigrate(grvProxyData), SERVER_KNOBS->GLOBAL_CONFIG_MIGRATE_TIMEOUT)));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
choose {
|
||||||
|
when(GlobalConfigRefreshRequest refresh = waitNext(grvProxy.refreshGlobalConfig.getFuture())) {
|
||||||
|
// Must have an up to date copy of global configuration in
|
||||||
|
// order to serve it to the client (up to date from the clients
|
||||||
|
// point of view. The client learns the version through a
|
||||||
|
// ClientDBInfo update).
|
||||||
|
if (refresh.lastKnown <= cachedVersion) {
|
||||||
|
refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedData });
|
||||||
|
} else {
|
||||||
|
refresh.reply.sendError(future_version());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
when(wait(refreshFuture)) {
|
||||||
|
refreshFuture = timeout(globalConfigRefresh(grvProxyData, &cachedVersion, &cachedData),
|
||||||
|
SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_TIMEOUT,
|
||||||
|
Void()) &&
|
||||||
|
delay(SERVER_KNOBS->GLOBAL_CONFIG_REFRESH_INTERVAL);
|
||||||
|
}
|
||||||
|
when(wait(actors.getResult())) { ASSERT(false); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get transaction rate info from RateKeeper.
|
// Get transaction rate info from RateKeeper.
|
||||||
ACTOR Future<Void> getRate(UID myID,
|
ACTOR Future<Void> getRate(UID myID,
|
||||||
Reference<AsyncVar<ServerDBInfo> const> db,
|
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||||
|
@ -1012,6 +1135,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
|
||||||
addActor.send(transactionStarter(
|
addActor.send(transactionStarter(
|
||||||
proxy, grvProxyData.db, addActor, &grvProxyData, &healthMetricsReply, &detailedHealthMetricsReply));
|
proxy, grvProxyData.db, addActor, &grvProxyData, &healthMetricsReply, &detailedHealthMetricsReply));
|
||||||
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
|
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
|
||||||
|
addActor.send(globalConfigRequestServer(&grvProxyData, proxy));
|
||||||
|
|
||||||
if (SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION > 0) {
|
if (SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION > 0) {
|
||||||
addActor.send(lastCommitUpdater(&grvProxyData, addActor));
|
addActor.send(lastCommitUpdater(&grvProxyData, addActor));
|
||||||
|
|
|
@ -2217,6 +2217,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
||||||
|
|
||||||
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); }
|
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_SHARDED_ROCKSDB); }
|
||||||
|
|
||||||
|
bool shardAware() const override { return true; }
|
||||||
|
|
||||||
Future<Void> init() override {
|
Future<Void> init() override {
|
||||||
if (openFuture.isValid()) {
|
if (openFuture.isValid()) {
|
||||||
return openFuture;
|
return openFuture;
|
||||||
|
|
|
@ -61,6 +61,9 @@ public:
|
||||||
class IKeyValueStore : public IClosable {
|
class IKeyValueStore : public IClosable {
|
||||||
public:
|
public:
|
||||||
virtual KeyValueStoreType getType() const = 0;
|
virtual KeyValueStoreType getType() const = 0;
|
||||||
|
// Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and
|
||||||
|
// persistRangeMapping().
|
||||||
|
virtual bool shardAware() const { return false; }
|
||||||
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
|
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
|
||||||
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
|
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
|
||||||
virtual Future<Void> canCommit() { return Void(); }
|
virtual Future<Void> canCommit() { return Void(); }
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
*PhysicalShardMove.actor.cpp
|
* PhysicalShardMove.cpp
|
||||||
*
|
*
|
||||||
* This source file is part of the FoundationDB open source project
|
* This source file is part of the FoundationDB open source project
|
||||||
*
|
*
|
||||||
|
@ -43,11 +43,14 @@ std::string printValue(const ErrorOr<Optional<Value>>& value) {
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
struct SSCheckpointWorkload : TestWorkload {
|
struct PhysicalShardMoveWorkLoad : TestWorkload {
|
||||||
|
FlowLock startMoveKeysParallelismLock;
|
||||||
|
FlowLock finishMoveKeysParallelismLock;
|
||||||
|
FlowLock cleanUpDataMoveParallelismLock;
|
||||||
const bool enabled;
|
const bool enabled;
|
||||||
bool pass;
|
bool pass;
|
||||||
|
|
||||||
SSCheckpointWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
PhysicalShardMoveWorkLoad(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||||
|
|
||||||
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
||||||
TraceEvent(SevError, "TestFailed")
|
TraceEvent(SevError, "TestFailed")
|
||||||
|
@ -56,7 +59,7 @@ struct SSCheckpointWorkload : TestWorkload {
|
||||||
pass = false;
|
pass = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string description() const override { return "SSCheckpoint"; }
|
std::string description() const override { return "PhysicalShardMove"; }
|
||||||
|
|
||||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||||
|
|
||||||
|
@ -67,126 +70,153 @@ struct SSCheckpointWorkload : TestWorkload {
|
||||||
return _start(this, cx);
|
return _start(this, cx);
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> _start(SSCheckpointWorkload* self, Database cx) {
|
ACTOR Future<Void> _start(PhysicalShardMoveWorkLoad* self, Database cx) {
|
||||||
state Key key = "TestKey"_sr;
|
|
||||||
state Key endKey = "TestKey0"_sr;
|
|
||||||
state Value oldValue = "TestValue"_sr;
|
|
||||||
state KeyRange testRange = KeyRangeRef(key, endKey);
|
|
||||||
|
|
||||||
int ignore = wait(setDDMode(cx, 0));
|
int ignore = wait(setDDMode(cx, 0));
|
||||||
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
state std::map<Key, Value> kvs({ { "TestKeyA"_sr, "TestValueA"_sr },
|
||||||
|
{ "TestKeyB"_sr, "TestValueB"_sr },
|
||||||
|
{ "TestKeyC"_sr, "TestValueC"_sr },
|
||||||
|
{ "TestKeyD"_sr, "TestValueD"_sr },
|
||||||
|
{ "TestKeyE"_sr, "TestValueE"_sr },
|
||||||
|
{ "TestKeyF"_sr, "TestValueF"_sr } });
|
||||||
|
|
||||||
// Create checkpoint.
|
Version _ = wait(self->populateData(self, cx, &kvs));
|
||||||
state Transaction tr(cx);
|
|
||||||
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
|
TraceEvent("TestValueWritten").log();
|
||||||
loop {
|
|
||||||
try {
|
state std::unordered_set<UID> excludes;
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
state std::unordered_set<UID> includes;
|
||||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
state int teamSize = 1;
|
||||||
wait(createCheckpoint(&tr, testRange, format));
|
std::vector<UID> teamA = wait(self->moveShard(self,
|
||||||
wait(tr.commit());
|
cx,
|
||||||
version = tr.getCommittedVersion();
|
deterministicRandom()->randomUniqueID(),
|
||||||
break;
|
KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr),
|
||||||
} catch (Error& e) {
|
teamSize,
|
||||||
wait(tr.onError(e));
|
includes,
|
||||||
}
|
excludes));
|
||||||
|
excludes.insert(teamA.begin(), teamA.end());
|
||||||
|
|
||||||
|
state uint64_t sh0 = deterministicRandom()->randomUInt64();
|
||||||
|
state uint64_t sh1 = deterministicRandom()->randomUInt64();
|
||||||
|
state uint64_t sh2 = deterministicRandom()->randomUInt64();
|
||||||
|
|
||||||
|
// Move range [TestKeyA, TestKeyB) to sh0.
|
||||||
|
state std::vector<UID> teamA = wait(self->moveShard(self,
|
||||||
|
cx,
|
||||||
|
UID(sh0, deterministicRandom()->randomUInt64()),
|
||||||
|
KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr),
|
||||||
|
teamSize,
|
||||||
|
includes,
|
||||||
|
excludes));
|
||||||
|
includes.insert(teamA.begin(), teamA.end());
|
||||||
|
// Move range [TestKeyB, TestKeyC) to sh1, on the same server.
|
||||||
|
state std::vector<UID> teamB = wait(self->moveShard(self,
|
||||||
|
cx,
|
||||||
|
UID(sh1, deterministicRandom()->randomUInt64()),
|
||||||
|
KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr),
|
||||||
|
teamSize,
|
||||||
|
includes,
|
||||||
|
excludes));
|
||||||
|
ASSERT(std::equal(teamA.begin(), teamA.end(), teamB.begin()));
|
||||||
|
|
||||||
|
state int teamIdx = 0;
|
||||||
|
for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) {
|
||||||
|
std::vector<StorageServerShard> shards =
|
||||||
|
wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr)));
|
||||||
|
ASSERT(shards.size() == 2);
|
||||||
|
ASSERT(shards[0].desiredId == sh0);
|
||||||
|
ASSERT(shards[1].desiredId == sh1);
|
||||||
|
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version);
|
state std::vector<UID> teamC = wait(self->moveShard(self,
|
||||||
|
cx,
|
||||||
|
UID(sh2, deterministicRandom()->randomUInt64()),
|
||||||
|
KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr),
|
||||||
|
teamSize,
|
||||||
|
includes,
|
||||||
|
excludes));
|
||||||
|
ASSERT(std::equal(teamA.begin(), teamA.end(), teamC.begin()));
|
||||||
|
|
||||||
// Fetch checkpoint meta data.
|
for (teamIdx = 0; teamIdx < teamA.size(); ++teamIdx) {
|
||||||
loop {
|
std::vector<StorageServerShard> shards =
|
||||||
try {
|
wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyA"_sr, "TestKeyC"_sr)));
|
||||||
state std::vector<CheckpointMetaData> records =
|
ASSERT(shards.size() == 2);
|
||||||
wait(getCheckpointMetaData(cx, testRange, version, format));
|
ASSERT(shards[0].desiredId == sh0);
|
||||||
break;
|
ASSERT(shards[1].id == sh1);
|
||||||
} catch (Error& e) {
|
ASSERT(shards[1].desiredId == sh2);
|
||||||
TraceEvent("TestFetchCheckpointMetadataError")
|
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
|
||||||
.errorUnsuppressed(e)
|
|
||||||
.detail("Range", testRange)
|
|
||||||
.detail("Version", version);
|
|
||||||
|
|
||||||
// The checkpoint was just created, we don't expect this error.
|
|
||||||
ASSERT(e.code() != error_code_checkpoint_not_found);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("TestCheckpointFetched")
|
wait(self->validateData(self, cx, KeyRangeRef("TestKeyA"_sr, "TestKeyF"_sr), &kvs));
|
||||||
.detail("Range", testRange)
|
TraceEvent("TestValueVerified").log();
|
||||||
.detail("Version", version)
|
|
||||||
.detail("Checkpoints", describe(records));
|
|
||||||
|
|
||||||
state std::string pwd = platform::getWorkingDirectory();
|
|
||||||
state std::string folder = pwd + "/checkpoints";
|
|
||||||
platform::eraseDirectoryRecursive(folder);
|
|
||||||
ASSERT(platform::createDirectory(folder));
|
|
||||||
|
|
||||||
// Fetch checkpoint.
|
|
||||||
state std::vector<CheckpointMetaData> fetchedCheckpoints;
|
|
||||||
state int i = 0;
|
|
||||||
for (; i < records.size(); ++i) {
|
|
||||||
loop {
|
|
||||||
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
|
|
||||||
try {
|
|
||||||
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
|
|
||||||
fetchedCheckpoints.push_back(record);
|
|
||||||
TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString());
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent("TestFetchCheckpointError")
|
|
||||||
.errorUnsuppressed(e)
|
|
||||||
.detail("Checkpoint", records[i].toString());
|
|
||||||
wait(delay(1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
state std::string rocksDBTestDir = "rocksdb-kvstore-test-db";
|
|
||||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
|
||||||
|
|
||||||
// Restore KVS.
|
|
||||||
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
|
|
||||||
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
|
|
||||||
wait(kvStore->init());
|
|
||||||
try {
|
|
||||||
wait(kvStore->restore(fetchedCheckpoints));
|
|
||||||
} catch (Error& e) {
|
|
||||||
TraceEvent(SevError, "TestRestoreCheckpointError")
|
|
||||||
.errorUnsuppressed(e)
|
|
||||||
.detail("Checkpoint", describe(records));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare the keyrange between the original database and the one restored from checkpoint.
|
|
||||||
// For now, it should have been a single key.
|
|
||||||
tr.reset();
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
||||||
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
|
|
||||||
break;
|
|
||||||
} catch (Error& e) {
|
|
||||||
wait(tr.onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
RangeResult kvRange = wait(kvStore->readRange(testRange));
|
|
||||||
ASSERT(res.size() == kvRange.size());
|
|
||||||
for (int i = 0; i < res.size(); ++i) {
|
|
||||||
ASSERT(res[i] == kvRange[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
int ignore = wait(setDDMode(cx, 1));
|
int ignore = wait(setDDMode(cx, 1));
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> readAndVerify(SSCheckpointWorkload* self,
|
ACTOR Future<Version> populateData(PhysicalShardMoveWorkLoad* self, Database cx, std::map<Key, Value>* kvs) {
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||||
|
state Version version;
|
||||||
|
loop {
|
||||||
|
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||||
|
try {
|
||||||
|
tr->debugTransaction(debugID);
|
||||||
|
for (const auto& [key, value] : *kvs) {
|
||||||
|
tr->set(key, value);
|
||||||
|
}
|
||||||
|
wait(tr->commit());
|
||||||
|
version = tr->getCommittedVersion();
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("PopulateTestDataDone")
|
||||||
|
.detail("CommitVersion", tr->getCommittedVersion())
|
||||||
|
.detail("DebugID", debugID);
|
||||||
|
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> validateData(PhysicalShardMoveWorkLoad* self,
|
||||||
|
Database cx,
|
||||||
|
KeyRange range,
|
||||||
|
std::map<Key, Value>* kvs) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||||
|
try {
|
||||||
|
tr.debugTransaction(debugID);
|
||||||
|
RangeResult res = wait(tr.getRange(range, CLIENT_KNOBS->TOO_MANY));
|
||||||
|
ASSERT(!res.more && res.size() < CLIENT_KNOBS->TOO_MANY);
|
||||||
|
|
||||||
|
for (const auto& kv : res) {
|
||||||
|
ASSERT((*kvs)[kv.key] == kv.value);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("ValidateTestDataDone").detail("DebugID", debugID);
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> readAndVerify(PhysicalShardMoveWorkLoad* self,
|
||||||
Database cx,
|
Database cx,
|
||||||
Key key,
|
Key key,
|
||||||
ErrorOr<Optional<Value>> expectedValue) {
|
ErrorOr<Optional<Value>> expectedValue) {
|
||||||
state Transaction tr(cx);
|
state Transaction tr(cx);
|
||||||
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
|
state Version readVersion = wait(tr.getReadVersion());
|
||||||
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
||||||
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
||||||
if (!equal) {
|
if (!equal) {
|
||||||
|
@ -194,6 +224,7 @@ struct SSCheckpointWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
TraceEvent("TestReadError").errorUnsuppressed(e);
|
||||||
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -201,35 +232,146 @@ struct SSCheckpointWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceEvent("TestReadSuccess").detail("Version", readVersion);
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Version> writeAndVerify(SSCheckpointWorkload* self, Database cx, Key key, Optional<Value> value) {
|
ACTOR Future<Version> writeAndVerify(PhysicalShardMoveWorkLoad* self, Database cx, Key key, Optional<Value> value) {
|
||||||
state Transaction tr(cx);
|
// state Transaction tr(cx);
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||||
state Version version;
|
state Version version;
|
||||||
loop {
|
loop {
|
||||||
|
state UID debugID = deterministicRandom()->randomUniqueID();
|
||||||
try {
|
try {
|
||||||
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
tr->debugTransaction(debugID);
|
||||||
if (value.present()) {
|
if (value.present()) {
|
||||||
tr.set(key, value.get());
|
tr->set(key, value.get());
|
||||||
|
tr->set("Test?"_sr, value.get());
|
||||||
|
tr->set(key, value.get());
|
||||||
} else {
|
} else {
|
||||||
tr.clear(key);
|
tr->clear(key);
|
||||||
}
|
}
|
||||||
wait(timeoutError(tr.commit(), 30.0));
|
wait(timeoutError(tr->commit(), 30.0));
|
||||||
version = tr.getCommittedVersion();
|
version = tr->getCommittedVersion();
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
wait(tr.onError(e));
|
TraceEvent("TestCommitError").errorUnsuppressed(e);
|
||||||
|
wait(tr->onError(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceEvent("TestCommitSuccess").detail("CommitVersion", tr->getCommittedVersion()).detail("DebugID", debugID);
|
||||||
|
|
||||||
wait(self->readAndVerify(self, cx, key, value));
|
wait(self->readAndVerify(self, cx, key, value));
|
||||||
|
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Move keys to a random selected team consisting of a single SS, after disabling DD, so that keys won't be
|
||||||
|
// kept in the new team until DD is enabled.
|
||||||
|
// Returns the address of the single SS of the new team.
|
||||||
|
ACTOR Future<std::vector<UID>> moveShard(PhysicalShardMoveWorkLoad* self,
|
||||||
|
Database cx,
|
||||||
|
UID dataMoveId,
|
||||||
|
KeyRange keys,
|
||||||
|
int teamSize,
|
||||||
|
std::unordered_set<UID> includes,
|
||||||
|
std::unordered_set<UID> excludes) {
|
||||||
|
// Disable DD to avoid DD undoing of our move.
|
||||||
|
int ignore = wait(setDDMode(cx, 0));
|
||||||
|
|
||||||
|
// Pick a random SS as the dest, keys will reside on a single server after the move.
|
||||||
|
std::vector<StorageServerInterface> interfs = wait(getStorageServers(cx));
|
||||||
|
ASSERT(interfs.size() > teamSize - includes.size());
|
||||||
|
while (includes.size() < teamSize) {
|
||||||
|
const auto& interf = interfs[deterministicRandom()->randomInt(0, interfs.size())];
|
||||||
|
if (excludes.count(interf.uniqueID) == 0 && includes.count(interf.uniqueID) == 0) {
|
||||||
|
includes.insert(interf.uniqueID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
state std::vector<UID> dests(includes.begin(), includes.end());
|
||||||
|
state UID owner = deterministicRandom()->randomUniqueID();
|
||||||
|
// state Key ownerKey = "\xff/moveKeysLock/Owner"_sr;
|
||||||
|
state DDEnabledState ddEnabledState;
|
||||||
|
|
||||||
|
state Transaction tr(cx);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
TraceEvent("TestMoveShard").detail("Range", keys.toString());
|
||||||
|
state MoveKeysLock moveKeysLock = wait(takeMoveKeysLock(cx, owner));
|
||||||
|
|
||||||
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
state RangeResult dataMoves = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
||||||
|
Version readVersion = wait(tr.getReadVersion());
|
||||||
|
TraceEvent("TestMoveShardReadDataMoves")
|
||||||
|
.detail("DataMoves", dataMoves.size())
|
||||||
|
.detail("ReadVersion", readVersion);
|
||||||
|
state int i = 0;
|
||||||
|
for (; i < dataMoves.size(); ++i) {
|
||||||
|
UID dataMoveId = decodeDataMoveKey(dataMoves[i].key);
|
||||||
|
state DataMoveMetaData dataMove = decodeDataMoveValue(dataMoves[i].value);
|
||||||
|
ASSERT(dataMoveId == dataMove.id);
|
||||||
|
TraceEvent("TestCancelDataMoveBegin").detail("DataMove", dataMove.toString());
|
||||||
|
wait(cleanUpDataMove(cx,
|
||||||
|
dataMoveId,
|
||||||
|
moveKeysLock,
|
||||||
|
&self->cleanUpDataMoveParallelismLock,
|
||||||
|
dataMove.range,
|
||||||
|
&ddEnabledState));
|
||||||
|
TraceEvent("TestCancelDataMoveEnd").detail("DataMove", dataMove.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(moveKeys(cx,
|
||||||
|
dataMoveId,
|
||||||
|
keys,
|
||||||
|
dests,
|
||||||
|
dests,
|
||||||
|
moveKeysLock,
|
||||||
|
Promise<Void>(),
|
||||||
|
&self->startMoveKeysParallelismLock,
|
||||||
|
&self->finishMoveKeysParallelismLock,
|
||||||
|
false,
|
||||||
|
deterministicRandom()->randomUniqueID(), // for logging only
|
||||||
|
&ddEnabledState));
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
if (e.code() == error_code_movekeys_conflict) {
|
||||||
|
// Conflict on moveKeysLocks with the current running DD is expected, just retry.
|
||||||
|
tr.reset();
|
||||||
|
} else {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("TestMoveShardComplete").detail("Range", keys.toString()).detail("NewTeam", describe(dests));
|
||||||
|
|
||||||
|
return dests;
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<std::vector<StorageServerShard>> getStorageServerShards(Database cx, UID ssId, KeyRange range) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
Optional<Value> serverListValue = wait(tr.get(serverListKeyFor(ssId)));
|
||||||
|
ASSERT(serverListValue.present());
|
||||||
|
state StorageServerInterface ssi = decodeServerListValue(serverListValue.get());
|
||||||
|
GetShardStateRequest req(range, GetShardStateRequest::READABLE, true);
|
||||||
|
GetShardStateReply rep = wait(ssi.getShardState.getReply(req, TaskPriority::DefaultEndpoint));
|
||||||
|
return rep.shards;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Future<bool> check(Database const& cx) override { return pass; }
|
Future<bool> check(Database const& cx) override { return pass; }
|
||||||
|
|
||||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
WorkloadFactory<SSCheckpointWorkload> SSCheckpointWorkloadFactory("SSCheckpointWorkload");
|
WorkloadFactory<PhysicalShardMoveWorkLoad> PhysicalShardMoveWorkLoadFactory("PhysicalShardMove");
|
|
@ -0,0 +1,238 @@
|
||||||
|
/*
|
||||||
|
*PhysicalShardMove.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/ManagementAPI.actor.h"
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
#include "fdbrpc/simulator.h"
|
||||||
|
#include "fdbserver/IKeyValueStore.h"
|
||||||
|
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||||
|
#include "fdbserver/MoveKeys.actor.h"
|
||||||
|
#include "fdbserver/QuietDatabase.h"
|
||||||
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "flow/Error.h"
|
||||||
|
#include "flow/IRandom.h"
|
||||||
|
#include "flow/flow.h"
|
||||||
|
#include <cstdint>
|
||||||
|
#include <limits>
|
||||||
|
|
||||||
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
std::string printValue(const ErrorOr<Optional<Value>>& value) {
|
||||||
|
if (value.isError()) {
|
||||||
|
return value.getError().name();
|
||||||
|
}
|
||||||
|
return value.get().present() ? value.get().get().toString() : "Value Not Found.";
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
struct SSCheckpointRestoreWorkload : TestWorkload {
|
||||||
|
const bool enabled;
|
||||||
|
bool pass;
|
||||||
|
|
||||||
|
SSCheckpointRestoreWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||||
|
|
||||||
|
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
||||||
|
TraceEvent(SevError, "TestFailed")
|
||||||
|
.detail("ExpectedValue", printValue(expectedValue))
|
||||||
|
.detail("ActualValue", printValue(actualValue));
|
||||||
|
pass = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string description() const override { return "SSCheckpoint"; }
|
||||||
|
|
||||||
|
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||||
|
|
||||||
|
Future<Void> start(Database const& cx) override {
|
||||||
|
if (!enabled) {
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
return _start(this, cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> _start(SSCheckpointRestoreWorkload* self, Database cx) {
|
||||||
|
state Key key = "TestKey"_sr;
|
||||||
|
state Key endKey = "TestKey0"_sr;
|
||||||
|
state Value oldValue = "TestValue"_sr;
|
||||||
|
state KeyRange testRange = KeyRangeRef(key, endKey);
|
||||||
|
|
||||||
|
int ignore = wait(setDDMode(cx, 0));
|
||||||
|
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
||||||
|
|
||||||
|
// Create checkpoint.
|
||||||
|
state Transaction tr(cx);
|
||||||
|
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
wait(createCheckpoint(&tr, testRange, format));
|
||||||
|
wait(tr.commit());
|
||||||
|
version = tr.getCommittedVersion();
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version);
|
||||||
|
|
||||||
|
// Fetch checkpoint meta data.
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
state std::vector<CheckpointMetaData> records =
|
||||||
|
wait(getCheckpointMetaData(cx, testRange, version, format));
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent("TestFetchCheckpointMetadataError")
|
||||||
|
.errorUnsuppressed(e)
|
||||||
|
.detail("Range", testRange)
|
||||||
|
.detail("Version", version);
|
||||||
|
|
||||||
|
// The checkpoint was just created, we don't expect this error.
|
||||||
|
ASSERT(e.code() != error_code_checkpoint_not_found);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TraceEvent("TestCheckpointFetched")
|
||||||
|
.detail("Range", testRange)
|
||||||
|
.detail("Version", version)
|
||||||
|
.detail("Checkpoints", describe(records));
|
||||||
|
|
||||||
|
state std::string pwd = platform::getWorkingDirectory();
|
||||||
|
state std::string folder = pwd + "/checkpoints";
|
||||||
|
platform::eraseDirectoryRecursive(folder);
|
||||||
|
ASSERT(platform::createDirectory(folder));
|
||||||
|
|
||||||
|
// Fetch checkpoint.
|
||||||
|
state std::vector<CheckpointMetaData> fetchedCheckpoints;
|
||||||
|
state int i = 0;
|
||||||
|
for (; i < records.size(); ++i) {
|
||||||
|
loop {
|
||||||
|
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
|
||||||
|
try {
|
||||||
|
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
|
||||||
|
fetchedCheckpoints.push_back(record);
|
||||||
|
TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString());
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent("TestFetchCheckpointError")
|
||||||
|
.errorUnsuppressed(e)
|
||||||
|
.detail("Checkpoint", records[i].toString());
|
||||||
|
wait(delay(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
state std::string rocksDBTestDir = "rocksdb-kvstore-test-db";
|
||||||
|
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||||
|
|
||||||
|
// Restore KVS.
|
||||||
|
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
|
||||||
|
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||||
|
wait(kvStore->init());
|
||||||
|
try {
|
||||||
|
wait(kvStore->restore(fetchedCheckpoints));
|
||||||
|
} catch (Error& e) {
|
||||||
|
TraceEvent(SevError, "TestRestoreCheckpointError")
|
||||||
|
.errorUnsuppressed(e)
|
||||||
|
.detail("Checkpoint", describe(records));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare the keyrange between the original database and the one restored from checkpoint.
|
||||||
|
// For now, it should have been a single key.
|
||||||
|
tr.reset();
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RangeResult kvRange = wait(kvStore->readRange(testRange));
|
||||||
|
ASSERT(res.size() == kvRange.size());
|
||||||
|
for (int i = 0; i < res.size(); ++i) {
|
||||||
|
ASSERT(res[i] == kvRange[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int ignore = wait(setDDMode(cx, 1));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> readAndVerify(SSCheckpointRestoreWorkload* self,
|
||||||
|
Database cx,
|
||||||
|
Key key,
|
||||||
|
ErrorOr<Optional<Value>> expectedValue) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
||||||
|
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
||||||
|
if (!equal) {
|
||||||
|
self->validationFailed(expectedValue, ErrorOr<Optional<Value>>(res));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Version> writeAndVerify(SSCheckpointRestoreWorkload* self,
|
||||||
|
Database cx,
|
||||||
|
Key key,
|
||||||
|
Optional<Value> value) {
|
||||||
|
state Transaction tr(cx);
|
||||||
|
state Version version;
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
if (value.present()) {
|
||||||
|
tr.set(key, value.get());
|
||||||
|
} else {
|
||||||
|
tr.clear(key);
|
||||||
|
}
|
||||||
|
wait(timeoutError(tr.commit(), 30.0));
|
||||||
|
version = tr.getCommittedVersion();
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr.onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(self->readAndVerify(self, cx, key, value));
|
||||||
|
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<bool> check(Database const& cx) override { return pass; }
|
||||||
|
|
||||||
|
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkloadFactory<SSCheckpointRestoreWorkload> SSCheckpointRestoreWorkloadFactory("SSCheckpointRestoreWorkload");
|
|
@ -192,10 +192,12 @@ if(WITH_PYTHON)
|
||||||
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml)
|
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml)
|
||||||
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
|
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
|
||||||
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)
|
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)
|
||||||
|
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml)
|
||||||
else()
|
else()
|
||||||
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE)
|
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE)
|
||||||
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE)
|
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE)
|
||||||
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE)
|
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE)
|
||||||
|
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE)
|
||||||
endif()
|
endif()
|
||||||
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
|
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
|
||||||
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
|
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
|
||||||
|
|
|
@ -6,9 +6,13 @@ coordinators = 3
|
||||||
machineCount = 15
|
machineCount = 15
|
||||||
allowDefaultTenant = false
|
allowDefaultTenant = false
|
||||||
|
|
||||||
|
[[knobs]]
|
||||||
|
shard_encode_location_metadata = true
|
||||||
|
storage_server_shard_aware = true
|
||||||
|
|
||||||
[[test]]
|
[[test]]
|
||||||
testTitle = 'PhysicalShardMove'
|
testTitle = 'PhysicalShardMove'
|
||||||
useDB = true
|
useDB = true
|
||||||
|
|
||||||
[[test.workload]]
|
[[test.workload]]
|
||||||
testName = 'SSCheckpointWorkload'
|
testName = 'PhysicalShardMove'
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
[configuration]
|
||||||
|
config = 'triple'
|
||||||
|
storageEngineType = 4
|
||||||
|
processesPerMachine = 1
|
||||||
|
coordinators = 3
|
||||||
|
machineCount = 15
|
||||||
|
allowDefaultTenant = false
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
testTitle = 'SSCheckpointRestoreWorkload'
|
||||||
|
useDB = true
|
||||||
|
|
||||||
|
[[test.workload]]
|
||||||
|
testName = 'SSCheckpointRestoreWorkload'
|
Loading…
Reference in New Issue