Open Database as switchable only for client

This commit is contained in:
Andrew Noyes 2019-05-24 10:51:08 -07:00
parent d4578a49d8
commit bc03421d05
3 changed files with 45 additions and 27 deletions

View File

@ -54,11 +54,17 @@ public:
// For internal (fdbserver) use only // For internal (fdbserver) use only
static Database create( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality ); static Database create( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality );
static Database create( Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID=TaskDefaultEndpoint, bool lockAware=false, int apiVersion=Database::API_VERSION_LATEST ); static Database create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
LocalityData clientLocality, bool enableLocalityLoadBalance,
int taskID = TaskDefaultEndpoint, bool lockAware = false,
int apiVersion = Database::API_VERSION_LATEST, bool switchable = false);
~DatabaseContext(); ~DatabaseContext();
Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); } Database clone() const {
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality,
enableLocalityLoadBalance, lockAware, apiVersion, switchable));
}
std::pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false ); std::pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse ); bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
@ -104,11 +110,13 @@ public:
// new cluster. // new cluster.
Future<Void> switchConnectionFile(Reference<ClusterConnectionFile> standby); Future<Void> switchConnectionFile(Reference<ClusterConnectionFile> standby);
Future<Void> recreateWatches(); Future<Void> recreateWatches();
bool switchable = false;
// private: // private:
explicit DatabaseContext( Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo, explicit DatabaseContext(Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID, LocalityData const& clientLocality, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID,
bool enableLocalityLoadBalance, bool lockAware, int apiVersion = Database::API_VERSION_LATEST ); LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware,
int apiVersion = Database::API_VERSION_LATEST, bool switchable = false);
explicit DatabaseContext( const Error &err ); explicit DatabaseContext( const Error &err );

View File

@ -174,13 +174,13 @@ std::string unprintable( std::string const& val ) {
} }
void DatabaseContext::validateVersion(Version version) { void DatabaseContext::validateVersion(Version version) {
// Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any reads. // Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any
// We throw client_invalid_operation because the caller didn't directly set the version, so the version_invalid error // reads. We throw client_invalid_operation because the caller didn't directly set the version, so the
// might be confusing. // version_invalid error might be confusing.
if(version == 0) { if (version == 0) {
throw client_invalid_operation(); throw client_invalid_operation();
} }
if (version < minAcceptableReadVersion) { if (switchable && version < minAcceptableReadVersion) {
TEST(true); // Attempted to read a version lower than any this client has seen from the current cluster TEST(true); // Attempted to read a version lower than any this client has seen from the current cluster
throw transaction_too_old(); throw transaction_too_old();
} }
@ -511,17 +511,20 @@ Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed); return getHealthMetricsActor(this, detailed);
} }
DatabaseContext::DatabaseContext( DatabaseContext::DatabaseContext(Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID,
int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion ) LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware,
: cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), int apiVersion, bool switchable)
lockAware(lockAware), apiVersion(apiVersion), provisional(false), : cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID),
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), apiVersion(apiVersion), switchable(switchable), provisional(false), transactionReadVersions(0),
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0), transactionsCommitCompleted(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0) transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0),
{ transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0),
transactionMaxRetries(-1), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000),
mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
detailedHealthMetricsLastUpdated(0) {
metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE); metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE);
maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES; maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
@ -605,8 +608,12 @@ Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>>
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false)); return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false));
} }
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware, int apiVersion) { Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion ) ); LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID,
bool lockAware, int apiVersion, bool switchable) {
return Database(new DatabaseContext(Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor,
LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance,
lockAware, apiVersion, switchable));
} }
DatabaseContext::~DatabaseContext() { DatabaseContext::~DatabaseContext() {
@ -818,6 +825,7 @@ Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
} }
Future<Void> DatabaseContext::switchConnectionFile(Reference<ClusterConnectionFile> standby) { Future<Void> DatabaseContext::switchConnectionFile(Reference<ClusterConnectionFile> standby) {
ASSERT(switchable);
return switchConnectionFileImpl(standby, this); return switchConnectionFileImpl(standby, this);
} }
@ -834,10 +842,13 @@ Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, in
DatabaseContext *db; DatabaseContext *db;
if(preallocatedDb) { if(preallocatedDb) {
db = new (preallocatedDb) DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion); db = new (preallocatedDb)
DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint,
clientLocality, true, false, apiVersion, /* switchable */ true);
} }
else { else {
db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion); db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint,
clientLocality, true, false, apiVersion, /* switchable */ true);
} }
return Database(db); return Database(db);

View File

@ -1831,8 +1831,7 @@ ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version versi
if( *isTooOld ) if( *isTooOld )
throw transaction_too_old(); throw transaction_too_old();
cx->minAcceptableReadVersion = ASSERT(!cx->switchable);
std::min(cx->minAcceptableReadVersion, version); // Suppress minAcceptableReadVersion check
tr.setVersion( version ); tr.setVersion( version );
limits.minRows = 0; limits.minRows = 0;